[improvement](disk balance) Impr disk rebalancer sched when be load r… (#26412)
This commit is contained in:
@ -220,6 +220,11 @@ public class Replica implements Writable {
|
||||
return this.backendId;
|
||||
}
|
||||
|
||||
// just for ut
|
||||
public void setBackendId(long backendId) {
|
||||
this.backendId = backendId;
|
||||
}
|
||||
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
@ -372,4 +372,5 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
|
||||
"beload waiting for dest backend slot");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -30,7 +30,6 @@ import org.apache.doris.clone.SchedException.SubCode;
|
||||
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -46,7 +45,6 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/*
|
||||
|
||||
* This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing.
|
||||
@ -129,14 +127,16 @@ public class DiskRebalancer extends Rebalancer {
|
||||
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
|
||||
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
|
||||
|
||||
if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) {
|
||||
PartitionRebalancer rebalancer = (PartitionRebalancer) Env.getCurrentEnv()
|
||||
.getTabletScheduler().getRebalancer();
|
||||
if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) {
|
||||
midBEs.addAll(lowBEs);
|
||||
midBEs.addAll(highBEs);
|
||||
}
|
||||
} else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
|
||||
Rebalancer rebalancer = FeConstants.runningUnitTest ? null
|
||||
: Env.getCurrentEnv().getTabletScheduler().getRebalancer();
|
||||
if (rebalancer != null && rebalancer.unPickOverLongTime(clusterStat.getTag(), medium)) {
|
||||
midBEs.addAll(lowBEs);
|
||||
midBEs.addAll(highBEs);
|
||||
lowBEs.clear();
|
||||
highBEs.clear();
|
||||
}
|
||||
|
||||
if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
|
||||
// the cluster is not balanced
|
||||
if (prioBackends.isEmpty()) {
|
||||
LOG.info("cluster is not balanced with medium: {}. skip", medium);
|
||||
|
||||
@ -66,8 +66,6 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
|
||||
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
|
||||
|
||||
private long cacheEmptyTimestamp = -1L;
|
||||
|
||||
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
super(infoService, invertedIndex, backendsWorkingSlots);
|
||||
@ -234,11 +232,6 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
return !bes.contains(move.fromBe) && bes.contains(move.toBe);
|
||||
}
|
||||
|
||||
// cache empty for 10 min
|
||||
public boolean checkCacheEmptyForLong() {
|
||||
return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > cacheEmptyTimestamp + 10 * 60 * 1000L;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completeSchedCtx(TabletSchedCtx tabletCtx)
|
||||
throws SchedException {
|
||||
@ -331,11 +324,6 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access);
|
||||
// Perform cache maintenance
|
||||
movesCacheMap.maintain();
|
||||
if (movesCacheMap.size() > 0) {
|
||||
cacheEmptyTimestamp = -1;
|
||||
} else if (cacheEmptyTimestamp < 0) {
|
||||
cacheEmptyTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
LOG.debug("Move succeeded/total :{}/{}, current {}",
|
||||
counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap);
|
||||
}
|
||||
|
||||
@ -25,8 +25,12 @@ import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -46,6 +50,7 @@ import java.util.Map;
|
||||
* 2. If you want to make sure the move is succeed, you can assume that it's succeed when getToDeleteReplicaId called.
|
||||
*/
|
||||
public abstract class Rebalancer {
|
||||
private static final Logger LOG = LogManager.getLogger(Rebalancer.class);
|
||||
// When Rebalancer init, the statisticMap is usually empty. So it's no need to be an arg.
|
||||
// Only use updateLoadStatistic() to load stats.
|
||||
protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
|
||||
@ -55,6 +60,14 @@ public abstract class Rebalancer {
|
||||
// be id -> end time of prio
|
||||
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
|
||||
|
||||
// tag -> (medium, timestamp)
|
||||
private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create();
|
||||
|
||||
// for ut
|
||||
public Table<Tag, TStorageMedium, Long> getLastPickTimeTable() {
|
||||
return lastPickTimeTable;
|
||||
}
|
||||
|
||||
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
|
||||
Map<Long, PathSlot> backendsWorkingSlots) {
|
||||
this.infoService = infoService;
|
||||
@ -66,7 +79,12 @@ public abstract class Rebalancer {
|
||||
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
|
||||
for (Map.Entry<Tag, LoadStatisticForTag> entry : statisticMap.entrySet()) {
|
||||
for (TStorageMedium medium : TStorageMedium.values()) {
|
||||
alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getValue(), medium));
|
||||
List<TabletSchedCtx> candidates =
|
||||
selectAlternativeTabletsForCluster(entry.getValue(), medium);
|
||||
alternativeTablets.addAll(candidates);
|
||||
if (!candidates.isEmpty()) {
|
||||
lastPickTimeTable.put(entry.getKey(), medium, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
}
|
||||
return alternativeTablets;
|
||||
@ -77,6 +95,14 @@ public abstract class Rebalancer {
|
||||
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
|
||||
LoadStatisticForTag clusterStat, TStorageMedium medium);
|
||||
|
||||
// 5mins
|
||||
protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) {
|
||||
Long lastPickTime = lastPickTimeTable.get(tag, medium);
|
||||
Long now = System.currentTimeMillis();
|
||||
LOG.debug("tag={}, medium={}, lastPickTime={}, now={}", tag, medium, lastPickTime, now);
|
||||
return lastPickTime == null || now - lastPickTime >= 5 * 60 * 1000L;
|
||||
}
|
||||
|
||||
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
|
||||
throws SchedException {
|
||||
completeSchedCtx(tabletCtx);
|
||||
|
||||
@ -171,6 +171,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return stat;
|
||||
}
|
||||
|
||||
// just return be or partition rebalancer
|
||||
public Rebalancer getRebalancer() {
|
||||
return rebalancer;
|
||||
}
|
||||
@ -274,6 +275,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return AddResult.ADDED;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public synchronized boolean containsTablet(long tabletId) {
|
||||
return allTabletTypes.containsKey(tabletId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user