[Rebalancer] support partition rebalancer (#5010)

RebalancerType could be configured via Config.rebalancer_type(BeLoad, Partition).
PartitionRebalancer is based on TwoDimensionalGreedyAlgo.
Two dims of Doris should be cluster & partition. And we only consider about the replica count, 
do not consider replica size.
#4845 for further details.
This commit is contained in:
HuangWei
2020-12-31 09:41:38 +08:00
committed by GitHub
parent fd6fb90a5a
commit d7a584ac59
14 changed files with 1668 additions and 43 deletions

View File

@ -536,7 +536,7 @@ public class Catalog {
this.metaContext.setThreadLocalInfo();
this.stat = new TabletSchedulerStat();
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat, Config.tablet_rebalancer_type);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size,

View File

@ -34,8 +34,10 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -66,10 +68,10 @@ public class TabletInvertedIndex {
// tablet id -> tablet meta
private Map<Long, TabletMeta> tabletMetaMap = Maps.newHashMap();
// replica id -> tablet id
private Map<Long, Long> replicaToTabletMap = Maps.newHashMap();
/*
* we use this to save memory.
* we do not need create TabletMeta instance for each tablet,
@ -80,7 +82,7 @@ public class TabletInvertedIndex {
* partition id -> (index id -> tablet meta)
*/
private Table<Long, Long, TabletMeta> tabletMetaTable = HashBasedTable.create();
// tablet id -> (backend id -> replica)
private Table<Long, Long, Replica> replicaMetaTable = HashBasedTable.create();
// backing replica table, for visiting backend replicas faster.
@ -112,7 +114,7 @@ public class TabletInvertedIndex {
ListMultimap<Long, Long> tabletDeleteFromMeta,
Set<Long> foundTabletsWithValidSchema,
Map<Long, TTabletInfo> foundTabletsWithInvalidSchema,
ListMultimap<TStorageMedium, Long> tabletMigrationMap,
ListMultimap<TStorageMedium, Long> tabletMigrationMap,
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
@ -149,7 +151,7 @@ public class TabletInvertedIndex {
// need sync
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
}
// check and set path
// path info of replica is only saved in Master FE
if (backendTabletInfo.isSetPathHash() &&
@ -165,8 +167,8 @@ public class TabletInvertedIndex {
if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
LOG.warn("replica {} of tablet {} on backend {} need recovery. "
+ "replica in FE: {}, report version {}-{}, report schema hash: {},"
+ " is bad: {}, is version missing: {}",
+ "replica in FE: {}, report version {}-{}, report schema hash: {},"
+ " is bad: {}, is version missing: {}",
replica.getId(), tabletId, backendId, replica,
backendTabletInfo.getVersion(),
backendTabletInfo.getVersionHash(),
@ -195,7 +197,7 @@ public class TabletInvertedIndex {
TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
LOG.debug("transaction id [{}] is not valid any more, "
LOG.debug("transaction id [{}] is not valid any more, "
+ "clear it from backend [{}]", transactionId, backendId);
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId());
@ -207,13 +209,13 @@ public class TabletInvertedIndex {
* 2. FE received report and begin to assemble partitionCommitInfos.
* 3. At the same time, some of partitions have been dropped, so partitionCommitInfos does not contain these partitions.
* 4. So we will not able to get partitionCommitInfo here.
*
*
* Just print a log to observe
*/
LOG.info("failed to find partition commit info. table: {}, partition: {}, tablet: {}, txn id: {}",
tabletMeta.getTableId(), partitionId, tabletId, transactionState.getTransactionId());
} else {
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(),
partitionCommitInfo.getVersionHash());
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId());
@ -237,7 +239,7 @@ public class TabletInvertedIndex {
foundTabletsWithInvalidSchema.put(tabletId, backendTabletInfo);
} // end for be tablet info
}
} else {
} else {
// 2. (meta - be)
// may need delete from meta
LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta);
@ -251,10 +253,10 @@ public class TabletInvertedIndex {
long end = System.currentTimeMillis();
LOG.info("finished to do tablet diff with backend[{}]. sync: {}. metaDel: {}. foundValid: {}. foundInvalid: {}."
+ " migration: {}. found invalid transactions {}. found republish transactions {} "
+ " cost: {} ms", backendId, tabletSyncMap.size(),
tabletDeleteFromMeta.size(), foundTabletsWithValidSchema.size(), foundTabletsWithInvalidSchema.size(),
tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), (end - start));
+ " migration: {}. found invalid transactions {}. found republish transactions {} "
+ " cost: {} ms", backendId, tabletSyncMap.size(),
tabletDeleteFromMeta.size(), foundTabletsWithValidSchema.size(), foundTabletsWithInvalidSchema.size(),
tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), (end - start));
}
public Long getTabletIdByReplica(long replicaId) {
@ -302,7 +304,7 @@ public class TabletInvertedIndex {
long versionInFe = replicaInFe.getVersion();
long versionHashInFe = replicaInFe.getVersionHash();
if (backendTabletInfo.getVersion() > versionInFe) {
// backend replica's version is larger or newer than replica in FE, sync it.
return true;
@ -311,10 +313,10 @@ public class TabletInvertedIndex {
// backend replica's version is equal to replica in FE, but replica in FE is bad, while backend replica is good, sync it
return true;
}
return false;
}
/**
* Be will set `used' to false for bad replicas and `version_miss' to true for replicas with hole
* in their version chain. In either case, those replicas need to be fixed by TabletScheduler.
@ -350,7 +352,7 @@ public class TabletInvertedIndex {
* 2. BE will report version (X+1, 0), and FE will sync with this version, change to (X+1, 0), too.
* 3. When restore, BE will restore the replica with version (X, Y) (which is the visible version of partition)
* 4. BE report the version (X-Y), and than we fall into here
*
*
* Actually, the version (X+1, 0) is a 'virtual' version, so here we ignore this kind of report
*/
return false;
@ -455,7 +457,7 @@ public class TabletInvertedIndex {
writeUnlock();
}
}
public Replica getReplica(long tabletId, long backendId) {
readLock();
try {
@ -602,5 +604,96 @@ public class TabletInvertedIndex {
public Map<Long, Long> getReplicaToTabletMap() {
return replicaToTabletMap;
}
// Only build from available bes, exclude colocate tables
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(List<Long> availableBeIds) {
readLock();
// 1. gen <partitionId-indexId, <beId, replicaCount>>
// for each replica(all tablets):
// find beId, then replicaCount++
Map<TStorageMedium, Table<Long, Long, Map<Long, Long>>> partitionReplicasInfoMaps = Maps.newHashMap();
for (TStorageMedium medium : TStorageMedium.values()) {
partitionReplicasInfoMaps.put(medium, HashBasedTable.create());
}
try {
// Changes to the returned set will update the underlying table
// tablet id -> (backend id -> replica)
Set<Table.Cell<Long, Long, Replica>> cells = replicaMetaTable.cellSet();
for (Table.Cell<Long, Long, Replica> cell : cells) {
Long tabletId = cell.getRowKey();
Long beId = cell.getColumnKey();
try {
Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId);
Preconditions.checkState(!Catalog.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()),
"should not be the colocate table");
TStorageMedium medium = tabletMeta.getStorageMedium();
Table<Long, Long, Map<Long, Long>> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium);
Map<Long, Long> countMap = partitionReplicasInfo.get(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
if (countMap == null) {
// If one be doesn't have any replica of one partition, it should be counted too.
countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L));
}
Long count = countMap.get(beId);
countMap.put(beId, count + 1L);
partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap);
partitionReplicasInfoMaps.put(medium, partitionReplicasInfo);
} catch (IllegalStateException | NullPointerException e) {
// If the tablet or be has some problem, don't count in
LOG.debug(e.getMessage());
}
}
} finally {
readUnlock();
}
// 2. Populate ClusterBalanceInfo::table_info_by_skew
// for each PartitionId-MaterializedIndex:
// for each beId: record max_count, min_count(replicaCount)
// put <max_count-min_count, TableBalanceInfo> to table_info_by_skew
Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
for (TStorageMedium medium : TStorageMedium.values()) {
TreeMultimap<Long, PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
Set<Table.Cell<Long, Long, Map<Long, Long>>> mapCells = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet();
for (Table.Cell<Long, Long, Map<Long, Long>> cell : mapCells) {
Map<Long, Long> countMap = cell.getValue();
Preconditions.checkNotNull(countMap);
PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey());
for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
Long beID = entry.getKey();
Long replicaCount = entry.getValue();
pbi.beByReplicaCount.put(replicaCount, beID);
}
// beByReplicaCount values are natural ordering
long minCount = pbi.beByReplicaCount.keySet().first();
long maxCount = pbi.beByReplicaCount.keySet().last();
partitionInfoBySkew.put(maxCount - minCount, pbi);
}
skewMaps.put(medium, partitionInfoBySkew);
}
return skewMaps;
}
public static class PartitionBalanceInfo {
public Long partitionId;
public Long indexId;
// Natural ordering
public TreeMultimap<Long, Long> beByReplicaCount = TreeMultimap.create();
public PartitionBalanceInfo(Long partitionId, Long indexId) {
this.partitionId = partitionId;
this.indexId = indexId;
}
public PartitionBalanceInfo(PartitionBalanceInfo info) {
this.partitionId = info.partitionId;
this.indexId = info.indexId;
this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
}
}
}

View File

@ -17,6 +17,10 @@
package org.apache.doris.clone;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.TreeMultimap;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
@ -25,11 +29,6 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -59,9 +58,11 @@ public class ClusterLoadStatistic {
// storage medium -> number of backend which has this kind of medium
private Map<TStorageMedium, Integer> backendNumMap = Maps.newHashMap();
private List<BackendLoadStatistic> beLoadStatistics = Lists.newArrayList();
private Map<TStorageMedium, TreeMultimap<Long, Long>> beByTotalReplicaCountMaps = Maps.newHashMap();
private Map<TStorageMedium, TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
public ClusterLoadStatistic(String clusterName, SystemInfoService infoService,
TabletInvertedIndex invertedIndex) {
TabletInvertedIndex invertedIndex) {
this.clusterName = clusterName;
this.infoService = infoService;
this.invertedIndex = invertedIndex;
@ -90,7 +91,7 @@ public class ClusterLoadStatistic {
beLoadStatistics.add(beStatistic);
}
for (TStorageMedium medium : TStorageMedium.values()) {
avgUsedCapacityPercentMap.put(medium, totalUsedCapacityMap.getOrDefault(medium, 0L) / (double) totalCapacityMap.getOrDefault(medium, 1L));
avgReplicaNumPercentMap.put(medium, totalReplicaNumMap.getOrDefault(medium, 0L) / (double) backendNumMap.getOrDefault(medium, 1));
@ -107,6 +108,22 @@ public class ClusterLoadStatistic {
// sort be stats by mix load score
Collections.sort(beLoadStatistics, BackendLoadStatistic.MIX_COMPARATOR);
// <medium -> Multimap<totalReplicaCount -> beId>>
// Only count the available be
for (TStorageMedium medium : TStorageMedium.values()) {
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable).forEach(beStat ->
beByTotalReplicaCount.put(beStat.getReplicaNum(medium), beStat.getBeId()));
beByTotalReplicaCountMaps.put(medium, beByTotalReplicaCount);
}
// Actually the partition is [partition_id, index_id], aka pid.
// Multimap<skew -> PartitionBalanceInfo>
// PartitionBalanceInfo: <pid -> <partitionReplicaCount, beId>>
// Only count available bes here, aligned with the beByTotalReplicaCountMaps.
skewMaps = invertedIndex.buildPartitionInfoBySkew(beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable).
map(BackendLoadStatistic::getBeId).collect(Collectors.toList()));
}
/*
@ -167,10 +184,10 @@ public class ClusterLoadStatistic {
* as more balance.
*/
public boolean isMoreBalanced(long srcBeId, long destBeId, long tabletId, long tabletSize,
TStorageMedium medium) {
TStorageMedium medium) {
double currentSrcBeScore;
double currentDestBeScore;
BackendLoadStatistic srcBeStat = null;
Optional<BackendLoadStatistic> optSrcBeStat = beLoadStatistics.stream().filter(
t -> t.getBeId() == srcBeId).findFirst();
@ -179,7 +196,7 @@ public class ClusterLoadStatistic {
} else {
return false;
}
BackendLoadStatistic destBeStat = null;
Optional<BackendLoadStatistic> optDestBeStat = beLoadStatistics.stream().filter(
t -> t.getBeId() == destBeId).findFirst();
@ -208,8 +225,8 @@ public class ClusterLoadStatistic {
double newDiff = Math.abs(newSrcBeScore.score - avgLoadScoreMap.get(medium)) + Math.abs(newDestBeScore.score - avgLoadScoreMap.get(medium));
LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed."
+ " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {},"
+ " more balanced: {}",
+ " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {},"
+ " more balanced: {}",
tabletId, tabletSize, srcBeId, destBeId, medium, currentSrcBeScore, newSrcBeScore.score,
currentDestBeScore, newDestBeScore.score, avgLoadScoreMap.get(medium), currentDiff, newDiff,
(newDiff < currentDiff));
@ -332,4 +349,12 @@ public class ClusterLoadStatistic {
}
return sb.toString();
}
public TreeMultimap<Long, Long> getBeByTotalReplicaMap(TStorageMedium medium) {
return beByTotalReplicaCountMaps.get(medium);
}
public TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> getSkewMap(TStorageMedium medium) {
return skewMaps.get(medium);
}
}

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/*
* MovesCacheMap stores MovesCache for every cluster and medium.
* MovesCache is a simple encapsulation of Guava Cache. Use it by calling MovesCache.get().
* MovesCache's expireAfterAccess can be reset when updating the cache mapping. If expireAfterAccess reset,
* all MovesCaches will be cleared and recreated.
*/
public class MovesCacheMap {
private static final Logger LOG = LogManager.getLogger(MovesCacheMap.class);
// cluster -> medium -> MovesCache
private final Map<String, Map<TStorageMedium, MovesCache>> cacheMap = Maps.newHashMap();
private long lastExpireConfig = -1L;
// TabletId -> Pair<Move, ToDeleteReplicaId>, 'ToDeleteReplicaId == -1' means this move haven't been scheduled successfully.
public static class MovesCache {
Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> cache;
MovesCache(long duration, TimeUnit unit) {
cache = CacheBuilder.newBuilder().expireAfterAccess(duration, unit).build();
}
public Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> get() {
return cache;
}
}
// Cyclical update the cache mapping, cuz the cluster may be deleted, we should delete the corresponding cache too.
public void updateMapping(Map<String, ClusterLoadStatistic> statisticMap, long expireAfterAccessSecond) {
if (expireAfterAccessSecond > 0 && lastExpireConfig != expireAfterAccessSecond) {
LOG.debug("Reset expireAfterAccess, last {}s, now {}s. Moves will be cleared.", lastExpireConfig, expireAfterAccessSecond);
cacheMap.clear();
lastExpireConfig = expireAfterAccessSecond;
}
cacheMap.keySet().stream().filter(k -> !statisticMap.containsKey(k)).forEach(cacheMap::remove);
List<String> toAdd = statisticMap.keySet().stream().filter(k -> !cacheMap.containsKey(k)).collect(Collectors.toList());
for (String cluster : toAdd) {
Map<TStorageMedium, MovesCache> cacheMap = Maps.newHashMap();
Arrays.stream(TStorageMedium.values()).forEach(m -> cacheMap.put(m, new MovesCache(expireAfterAccessSecond, TimeUnit.SECONDS)));
this.cacheMap.put(cluster, cacheMap);
}
}
public MovesCache getCache(String clusterName, TStorageMedium medium) {
Map<TStorageMedium, MovesCache> clusterMoves = cacheMap.get(clusterName);
if (clusterMoves != null) {
return clusterMoves.get(medium);
}
return null;
}
// For each MovesCache, performs any pending maintenance operations needed by the cache.
public void maintain() {
cacheMap.values().forEach(maps -> maps.values().forEach(map -> map.get().cleanUp()));
}
public long size() {
return cacheMap.values().stream().mapToLong(maps -> maps.values().stream().mapToLong(map -> map.get().size()).sum()).sum();
}
@Override
public String toString() {
StringJoiner sj = new StringJoiner("\n", "MovesInProgress detail:\n", "");
cacheMap.forEach((key, value) -> value.forEach((k, v) -> sj.add("(" + key + "-" + k + ": " + v.get().asMap() + ")")));
return sj.toString();
}
}

View File

@ -0,0 +1,337 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/*
* PartitionRebalancer will decrease the skew of partitions. The skew of the partition is defined as the difference
* between the maximum replica count of the partition over all bes and the minimum replica count over all bes.
* Only consider about the replica count for each partition, never consider the replica size(disk usage).
*
* We use TwoDimensionalGreedyRebalanceAlgo to get partition moves(one PartitionMove is <partition id, from be, to be>).
* It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition.
*
* selectAlternativeTabletsForCluster() must set the tablet id, so we need to select tablet for each move in this phase
* (as TabletMove).
*/
public class PartitionRebalancer extends Rebalancer {
private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class);
private final TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo();
private final MovesCacheMap movesCacheMap = new MovesCacheMap();
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
super(infoService, invertedIndex);
}
@Override
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) {
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(clusterName, medium);
Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesCacheMap should have the same entry");
// Iterating through Cache.asMap().values() does not reset access time for the entries you retrieve.
List<TabletMove> movesInProgressList = movesInProgress.get().asMap().values()
.stream().map(p -> p.first).collect(Collectors.toList());
List<Long> toDeleteKeys = Lists.newArrayList();
// The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check
// of moves which have valid ToDeleteReplica.
List<TabletMove> movesNeedCheck = movesInProgress.get().asMap().values()
.stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList());
checkMovesCompleted(movesNeedCheck, toDeleteKeys);
ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo();
// We should assume the in-progress moves have been succeeded to avoid producing the same moves.
// Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity.
if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) {
return Lists.newArrayList();
}
// Just delete the completed or problematic moves
if (!toDeleteKeys.isEmpty()) {
movesInProgress.get().invalidateAll(toDeleteKeys);
movesInProgressList = movesInProgressList.stream()
.filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
}
// The balancing tasks of other cluster or medium might have failed. We use the upper limit value
// `total num of in-progress moves` to avoid useless selections.
if (movesCacheMap.size() > Config.max_balancing_tablets) {
LOG.debug("Total in-progress moves > {}", Config.max_balancing_tablets);
return Lists.newArrayList();
}
NavigableSet<Long> skews = clusterBalanceInfo.partitionInfoBySkew.keySet();
LOG.debug("Cluster {}-{}: peek max skew {}, assume {} in-progress moves are succeeded {}", clusterName, medium,
skews.isEmpty() ? 0 : skews.last(), movesInProgressList.size(), movesInProgressList);
List<TwoDimensionalGreedyRebalanceAlgo.PartitionMove> moves = algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
// Find all tablets of the specified partition that would have a replica at the source be,
// but would not have a replica at the destination be. That is to satisfy the restriction
// of having no more than one replica of the same tablet per be.
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
tabletIds.removeAll(invalidIds);
// In-progress tablets can't be the candidate too.
tabletIds.removeAll(inProgressIds);
Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
for (long tabletId : tabletIds) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId) {
tabletCandidates.put(tabletId, tabletMeta);
}
}
LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
if (tabletCandidates.isEmpty()) {
continue;
}
// Random pick one candidate to create tabletSchedCtx
Random rand = new Random();
Object[] keys = tabletCandidates.keySet().toArray();
long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(), pickedTabletId, System.currentTimeMillis());
// Balance task's priority is always LOW
tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
alternativeTablets.add(tabletCtx);
// Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
movesInProgress.get().put(pickedTabletId, new Pair<>(new TabletMove(pickedTabletId, move.fromBe, move.toBe), -1L));
counterBalanceMoveCreated.incrementAndGet();
// Synchronize with movesInProgress
inProgressIds.add(pickedTabletId);
}
if (moves.isEmpty()) {
// Balanced cluster should not print too much log messages, so we log it with level debug.
LOG.debug("Cluster {}-{}: cluster is balanced.", clusterName, medium);
} else {
LOG.info("Cluster {}-{}: get {} moves, actually select {} alternative tablets to move. Tablets detail: {}",
clusterName, medium, moves.size(), alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
return alternativeTablets;
}
private boolean buildClusterInfo(ClusterLoadStatistic clusterStat, TStorageMedium medium,
List<TabletMove> movesInProgress, ClusterBalanceInfo info, List<Long> toDeleteKeys) {
Preconditions.checkState(info.beByTotalReplicaCount.isEmpty() && info.partitionInfoBySkew.isEmpty(), "");
// If we wanna modify the PartitionBalanceInfo in info.beByTotalReplicaCount, deep-copy it
info.beByTotalReplicaCount.putAll(clusterStat.getBeByTotalReplicaMap(medium));
info.partitionInfoBySkew.putAll(clusterStat.getSkewMap(medium));
// Skip the toDeleteKeys
List<TabletMove> filteredMoves = movesInProgress.stream().filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
for (TabletMove move : filteredMoves) {
TabletMeta meta = invertedIndex.getTabletMeta(move.tabletId);
if (meta == null) {
// Move's tablet is invalid, need delete it
toDeleteKeys.add(move.tabletId);
continue;
}
TwoDimensionalGreedyRebalanceAlgo.PartitionMove partitionMove = new TwoDimensionalGreedyRebalanceAlgo.PartitionMove(meta.getPartitionId(), meta.getIndexId(), move.fromBe, move.toBe);
boolean st = TwoDimensionalGreedyRebalanceAlgo.applyMove(partitionMove, info.beByTotalReplicaCount, info.partitionInfoBySkew);
if (!st) {
// Can't apply this move, mark it failed, continue to apply the next.
toDeleteKeys.add(move.tabletId);
}
}
return true;
}
private void checkMovesCompleted(List<TabletMove> moves, List<Long> toDeleteKeys) {
boolean moveIsComplete;
for (TabletMove move : moves) {
moveIsComplete = checkMoveCompleted(move);
// If the move was completed, remove it
if (moveIsComplete) {
toDeleteKeys.add(move.tabletId);
LOG.debug("Move {} is completed. The cur dist: {}", move,
invertedIndex.getReplicasByTabletId(move.tabletId).stream().map(Replica::getBackendId).collect(Collectors.toList()));
counterBalanceMoveSucceeded.incrementAndGet();
}
}
}
// Move completed: fromBe doesn't have a replica and toBe has a replica
private boolean checkMoveCompleted(TabletMove move) {
Long tabletId = move.tabletId;
List<Long> bes = invertedIndex.getReplicasByTabletId(tabletId).stream().map(Replica::getBackendId).collect(Collectors.toList());
return !bes.contains(move.fromBe) && bes.contains(move.toBe);
}
@Override
protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, TabletScheduler.PathSlot> backendsWorkingSlots)
throws SchedException {
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getCluster(), tabletCtx.getStorageMedium());
Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesInProgressMap should have the same entry");
try {
Pair<TabletMove, Long> pair = movesInProgress.get().getIfPresent(tabletCtx.getTabletId());
Preconditions.checkNotNull(pair, "No cached move for tablet: " + tabletCtx.getTabletId());
TabletMove move = pair.first;
checkMoveValidation(move);
// Check src replica's validation
Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe);
Preconditions.checkNotNull(srcReplica);
TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId());
Preconditions.checkNotNull(slot, "unable to get fromBe " + srcReplica.getBackendId() + " slot");
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, "no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
}
// Choose a path in destination
ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster());
Preconditions.checkNotNull(clusterStat, "cluster does not exist: " + tabletCtx.getCluster());
BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(move.toBe);
Preconditions.checkNotNull(beStat);
slot = backendsWorkingSlots.get(move.toBe);
Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);
List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, "paths has no available balance slot: " + availPath);
} else {
tabletCtx.setDest(beStat.getBeId(), pathHash);
}
// ToDeleteReplica is the source replica
pair.second = srcReplica.getId();
} catch (IllegalStateException | NullPointerException e) {
// Problematic move should be invalidated immediately
movesInProgress.get().invalidate(tabletCtx.getTabletId());
throw new SchedException(SchedException.Status.UNRECOVERABLE, e.getMessage());
}
}
// The validation check cannot be accurate, cuz the production of moves do have ordering.
// If some moves failed, the cluster & partition skew is different to the skew when we getNextMove.
// So we can't do skew check.
// Just do some basic checks, e.g. server available.
private void checkMoveValidation(TabletMove move) throws IllegalStateException {
boolean fromAvailable = infoService.checkBackendAvailable(move.fromBe);
boolean toAvailable = infoService.checkBackendAvailable(move.toBe);
Preconditions.checkState(fromAvailable && toAvailable, move + "'s bes are not all available: from " + fromAvailable + ", to " + toAvailable);
// To be improved
}
@Override
public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getCluster(), tabletCtx.getStorageMedium());
// We don't invalidate the cached move here, cuz the redundant repair progress is just started.
// The move should be invalidated by TTL or Algo.CheckMoveCompleted()
Pair<TabletMove, Long> pair = movesInProgress.get().getIfPresent(tabletCtx.getTabletId());
if (pair != null) {
Preconditions.checkState(pair.second != -1L);
return pair.second;
} else {
return (long) -1;
}
}
@Override
public void updateLoadStatistic(Map<String, ClusterLoadStatistic> statisticMap) {
super.updateLoadStatistic(statisticMap);
movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access);
// Perform cache maintenance
movesCacheMap.maintain();
LOG.debug("Move succeeded/total :{}/{}, current {}",
counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap);
}
// Represents a concrete move of a tablet from one be to another.
// Formed logically from a PartitionMove by specifying a tablet for the move.
public static class TabletMove {
Long tabletId;
Long fromBe;
Long toBe;
TabletMove(Long id, Long from, Long to) {
this.tabletId = id;
this.fromBe = from;
this.toBe = to;
}
@Override
public String toString() {
return "ReplicaMove{" +
"tabletId=" + tabletId +
", fromBe=" + fromBe +
", toBe=" + toBe +
'}';
}
}
// Balance information for a cluster(one medium), excluding decommissioned/dead bes and replicas on them.
// Natural ordering, so the last key is the max key.
public static class ClusterBalanceInfo {
TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
}
}

View File

@ -65,7 +65,7 @@ public abstract class Rebalancer {
return alternativeTablets;
}
// The return TabletSchedCtx should have the tablet id at least. {srcReplica, destBe} can be complete here or
// The returned TabletSchedCtx should have the tablet id at least. {srcReplica, destBe} can be complete here or
// later(when createBalanceTask called).
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium);
@ -86,7 +86,7 @@ public abstract class Rebalancer {
protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots)
throws SchedException;
public Long getToDeleteReplicaId(Long tabletId) {
public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
return -1L;
}

View File

@ -138,14 +138,18 @@ public class TabletScheduler extends MasterDaemon {
}
public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
TabletSchedulerStat stat) {
TabletSchedulerStat stat, String rebalancerType) {
super("tablet scheduler", SCHEDULE_INTERVAL_MS);
this.catalog = catalog;
this.infoService = infoService;
this.invertedIndex = invertedIndex;
this.colocateTableIndex = catalog.getColocateTableIndex();
this.stat = stat;
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
if (rebalancerType.equalsIgnoreCase("partition")) {
this.rebalancer = new PartitionRebalancer(infoService, invertedIndex);
} else {
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
}
}
public TabletSchedulerStat getStat() {
@ -852,7 +856,7 @@ public class TabletScheduler extends MasterDaemon {
}
private boolean deleteReplicaChosenByRebalancer(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
Long id = rebalancer.getToDeleteReplicaId(tabletCtx.getTabletId());
Long id = rebalancer.getToDeleteReplicaId(tabletCtx);
if (id == -1L) {
return false;
}

View File

@ -0,0 +1,329 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
import org.apache.doris.common.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
/*
* A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
* only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
*
* From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
* A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
* The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
* minimum total replica count over all bes.
*
* This class is modified from kudu TwoDimensionalGreedyAlgo.
*/
public class TwoDimensionalGreedyRebalanceAlgo {
private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
private final EqualSkewOption equalSkewOption;
private static final Random rand = new Random(System.currentTimeMillis());
public static class PartitionMove {
Long partitionId;
Long indexId;
Long fromBe;
Long toBe;
PartitionMove(Long p, Long i, Long f, Long t) {
this.partitionId = p;
this.indexId = i;
this.fromBe = f;
this.toBe = t;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionMove that = (PartitionMove) o;
return Objects.equal(partitionId, that.partitionId) &&
Objects.equal(indexId, that.indexId) &&
Objects.equal(fromBe, that.fromBe) &&
Objects.equal(toBe, that.toBe);
}
@Override
public int hashCode() {
return Objects.hashCode(partitionId, indexId, fromBe, toBe);
}
@Override
public String toString() {
return "ReplicaMove{" +
"pid=" + partitionId + "-" + indexId +
", from=" + fromBe +
", to=" + toBe +
'}';
}
}
public enum EqualSkewOption {
// generally only be used on unit test
PICK_FIRST,
PICK_RANDOM
}
public enum ExtremumType {
MAX,
MIN
}
public static class IntersectionResult {
Long replicaCountPartition;
Long replicaCountTotal;
List<Long> beWithExtremumCount;
List<Long> intersection;
}
TwoDimensionalGreedyRebalanceAlgo() {
this(EqualSkewOption.PICK_RANDOM);
}
TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
this.equalSkewOption = equalSkewOption;
}
// maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
// May modify the ClusterBalanceInfo
public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
Preconditions.checkArgument(maxMovesNum >= 0);
if (maxMovesNum == 0) {
maxMovesNum = Integer.MAX_VALUE;
}
if (info.partitionInfoBySkew.isEmpty()) {
// Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
// the partition skew, partition count for all the be should be 0.
// Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
LOG.debug(keySet);
Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
"non-zero replica count on be while no partition skew information in skewMap");
// Nothing to balance: cluster is empty.
return Lists.newArrayList();
}
List<PartitionMove> moves = Lists.newArrayList();
for (int i = 0; i < maxMovesNum; ++i) {
PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
// 1. No replicas to move.
// 2. Apply to info failed, it's useless to get next move from the same info.
break;
}
moves.add(move);
}
return moves;
}
private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
PartitionMove move = null;
if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
return null;
}
long maxPartitionSkew = skewMap.keySet().last();
long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
// 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
// is no potential for the greedy algorithm to balance the cluster.
// 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
return null;
}
// Among the partitions with maximum skew, attempt to pick a partition where there is
// a move that improves the partition skew and the cluster skew, if possible. If
// not, attempt to pick a move that improves the partition skew. If all partitions
// are balanced, attempt to pick a move that preserves partition balance and
// improves cluster skew.
NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
for (PartitionBalanceInfo pbi : maxSet) {
Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of " +
"partition " + pbi.partitionId + "-" + pbi.indexId);
Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
LOG.debug("balancing partition {}-{} with replica count skew {} (min_replica_count: {}, max_replica_count: {})",
pbi.partitionId, pbi.indexId, maxPartitionSkew,
minReplicaCount, maxReplicaCount);
// Compute the intersection of the bes most loaded for the table
// with the bes most loaded overall, and likewise for least loaded.
// These are our ideal candidates for moving from and to, respectively.
IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX, pbi.beByReplicaCount, beByTotalReplicaCount);
IntersectionResult minLoaded = getIntersection(ExtremumType.MIN, pbi.beByReplicaCount, beByTotalReplicaCount);
LOG.debug("partition-wise: min_count: {}, max_count: {}", minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);
LOG.debug("cluster-wise: min_count: {}, max_count: {}", minLoaded.replicaCountTotal, maxLoaded.replicaCountTotal);
LOG.debug("min_loaded_intersection: {}, max_loaded_intersection: {}", minLoaded.intersection.toString(), maxLoaded.intersection.toString());
// Do not move replicas of a balanced table if the least (most) loaded
// servers overall do not intersect the servers hosting the least (most)
// replicas of the table. Moving a replica in that case might keep the
// cluster skew the same or make it worse while keeping the table balanced.
if ((maxLoaded.replicaCountPartition <= minLoaded.replicaCountPartition + 1)
&& (minLoaded.intersection.isEmpty() || maxLoaded.intersection.isEmpty())) {
continue;
}
Long minLoadedBe, maxLoadedBe;
if (equalSkewOption == EqualSkewOption.PICK_FIRST) {
// beWithExtremumCount lists & intersection lists are natural ordering
minLoadedBe = minLoaded.intersection.isEmpty() ? minLoaded.beWithExtremumCount.get(0) : minLoaded.intersection.get(0);
maxLoadedBe = maxLoaded.intersection.isEmpty() ? maxLoaded.beWithExtremumCount.get(0) : maxLoaded.intersection.get(0);
} else {
minLoadedBe = minLoaded.intersection.isEmpty() ? getRandomListElement(minLoaded.beWithExtremumCount)
: getRandomListElement(minLoaded.intersection);
maxLoadedBe = maxLoaded.intersection.isEmpty() ? getRandomListElement(maxLoaded.beWithExtremumCount)
: getRandomListElement(maxLoaded.intersection);
}
LOG.debug("min_loaded_be: {}, max_loaded_be: {}", minLoadedBe, maxLoadedBe);
if (minLoadedBe.equals(maxLoadedBe)) {
// Nothing to move.
continue;
}
// Move a replica of the selected partition from a most loaded server to a
// least loaded server.
move = new PartitionMove(pbi.partitionId, pbi.indexId, maxLoadedBe, minLoadedBe);
break;
}
return move;
}
public static <T> T getRandomListElement(List<T> items) {
Preconditions.checkArgument(!items.isEmpty());
return items.get(rand.nextInt(items.size()));
}
public static IntersectionResult getIntersection(ExtremumType extremumType, TreeMultimap<Long, Long> beByReplicaCount,
TreeMultimap<Long, Long> beByTotalReplicaCount) {
Pair<Long, Set<Long>> beSelectedByPartition = getMinMaxLoadedServers(beByReplicaCount, extremumType);
Pair<Long, Set<Long>> beSelectedByTotal = getMinMaxLoadedServers(beByTotalReplicaCount, extremumType);
Preconditions.checkNotNull(beSelectedByPartition);
Preconditions.checkNotNull(beSelectedByTotal);
IntersectionResult res = new IntersectionResult();
res.replicaCountPartition = beSelectedByPartition.first;
res.replicaCountTotal = beSelectedByTotal.first;
res.beWithExtremumCount = Lists.newArrayList(beSelectedByPartition.second);
res.intersection = Lists.newArrayList(Sets.intersection(beSelectedByPartition.second, beSelectedByTotal.second));
return res;
}
private static Pair<Long, Set<Long>> getMinMaxLoadedServers(TreeMultimap<Long, Long> multimap, ExtremumType extremumType) {
if (multimap.isEmpty()) {
return null;
}
Long count = (extremumType == ExtremumType.MIN) ? multimap.keySet().first() : multimap.keySet().last();
return new Pair<>(count, multimap.get(count));
}
// Update the balance state in 'ClusterBalanceInfo'(the two maps) with the outcome of the move 'move'.
// To support apply in-progress moves to current cluster balance info, if apply failed, the maps should not be modified.
public static boolean applyMove(PartitionMove move, TreeMultimap<Long, Long> beByTotalReplicaCount,
TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
// Update the total counts
moveOneReplica(move.fromBe, move.toBe, beByTotalReplicaCount);
try {
PartitionBalanceInfo partitionBalanceInfo = null;
Long skew = -1L;
for (Long key : skewMap.keySet()) {
NavigableSet<PartitionBalanceInfo> pbiSet = skewMap.get(key);
List<PartitionBalanceInfo> pbis = pbiSet.stream().filter(info ->
info.partitionId.equals(move.partitionId) && info.indexId.equals(move.indexId)).collect(Collectors.toList());
Preconditions.checkState(pbis.size() <= 1, "skew map has dup partition info");
if (pbis.size() == 1) {
partitionBalanceInfo = pbis.get(0);
skew = key;
break;
}
}
Preconditions.checkState(skew != -1L, "partition is not in skew map");
PartitionBalanceInfo newInfo = new PartitionBalanceInfo(partitionBalanceInfo);
moveOneReplica(move.fromBe, move.toBe, newInfo.beByReplicaCount);
skewMap.remove(skew, partitionBalanceInfo);
long min_count = newInfo.beByReplicaCount.keySet().first();
long max_count = newInfo.beByReplicaCount.keySet().last();
skewMap.put(max_count - min_count, newInfo);
} catch (IllegalStateException e) {
// If touch IllegalState, the skew map doesn't be modified, so we should rollback the move of beByTotalReplicaCount
moveOneReplica(move.toBe, move.fromBe, beByTotalReplicaCount);
LOG.info("{} apply failed, {}", move, e.getMessage());
return false;
} catch (Exception e) {
// Rollback the move of beByTotalReplicaCount is meaningless here
LOG.warn("got unexpected exception when apply {}, the skew may be broken. {}", move, e.toString());
throw e;
}
return true;
}
// Applies to 'm' a move of a replica from the be with id 'src' to the be with id 'dst' by decrementing
// the count of 'src' and incrementing the count of 'dst'.
// If check failed, won't modify the map.
private static void moveOneReplica(Long fromBe, Long toBe, TreeMultimap<Long, Long> m) throws IllegalStateException {
boolean foundSrc = false;
boolean foundDst = false;
Long countSrc = 0L;
Long countDst = 0L;
for (Long key : m.keySet()) {
// set is arbitrary ordering, need to convert
Set<Long> values = m.get(key);
if (values.contains(fromBe)) {
foundSrc = true;
countSrc = key;
}
if (values.contains(toBe)) {
foundDst = true;
countDst = key;
}
}
Preconditions.checkState(foundSrc, "fromBe " + fromBe + " is not in the map");
Preconditions.checkState(foundDst, "toBe " + toBe + " is not in the map");
Preconditions.checkState(countSrc > 0, "fromBe has no replica in the map, can't move");
m.remove(countSrc, fromBe);
m.remove(countDst, toBe);
m.put(countSrc - 1, fromBe);
m.put(countDst + 1, toBe);
}
}

View File

@ -1011,6 +1011,18 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_balancing_tablets = 100;
// Rebalancer type(ignore case): BeLoad, Partition. If type parse failed, use BeLoad as default.
@ConfField(masterOnly = true)
public static String tablet_rebalancer_type = "BeLoad";
// Valid only if use PartitionRebalancer. If this changed, cached moves will be cleared.
@ConfField(mutable = true, masterOnly = true)
public static long partition_rebalance_move_expire_after_access = 600; // 600s
// Valid only if use PartitionRebalancer
@ConfField(mutable = true, masterOnly = true)
public static int partition_rebalance_max_moves_num_per_selection = 10;
// This threshold is to avoid piling up too many report task in FE, which may cause OOM exception.
// In some large Doris cluster, eg: 100 Backends with ten million replicas, a tablet report may cost
// several seconds after some modification of metadata(drop partition, etc..).

View File

@ -0,0 +1,301 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.CloneTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static com.google.common.collect.MoreCollectors.onlyElement;
public class RebalanceTest {
private static final Logger LOG = LogManager.getLogger(RebalanceTest.class);
@Mocked
private Catalog catalog;
private long id = 10086;
private Database db;
private OlapTable olapTable;
private final SystemInfoService systemInfoService = new SystemInfoService();
private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
private Map<String, ClusterLoadStatistic> statisticMap;
@Before
public void setUp() throws AnalysisException {
db = new Database(1, "test db");
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
new Expectations() {
{
catalog.getDbIds();
minTimes = 0;
result = db.getId();
catalog.getDb(anyLong);
minTimes = 0;
result = db;
Catalog.getCurrentCatalogJournalVersion();
minTimes = 0;
result = FeConstants.meta_version;
catalog.getNextId();
minTimes = 0;
result = new Delegate() {
long a() {
return id++;
}
};
Catalog.getCurrentSystemInfo();
minTimes = 0;
result = systemInfoService;
Catalog.getCurrentInvertedIndex();
minTimes = 0;
result = invertedIndex;
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
result = 111;
Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List<Long>) any);
result = true;
}
};
// Test mock validation
Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L)));
List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS,
new RangePartitionInfo(), new HashDistributionInfo());
db.createTable(olapTable);
// 1 table, 3 partitions p0,p1,p2
MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null);
createPartitionsForTable(olapTable, materializedIndex, 3L);
olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()),
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
// Tablet distribution: we add them to olapTable & build invertedIndex manually
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD,
50000, Lists.newArrayList(10001L, 10002L, 10003L));
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD,
60000, Lists.newArrayList(10001L, 10002L, 10003L));
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD,
70000, Lists.newArrayList(10001L, 10002L, 10003L));
// be4(10004) doesn't have any replica
generateStatisticMap();
}
private void generateStatisticMap() {
ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
systemInfoService, invertedIndex);
loadStatistic.init();
statisticMap = Maps.newConcurrentMap();
statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, loadStatistic);
}
private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) {
// partition id start from 31
LongStream.range(0, partitionCount).forEach(idx -> {
long id = 31 + idx;
Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), (short) 3, false);
});
}
@Test
public void testPartitionRebalancer() {
Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG);
// Disable scheduler's rebalancer adding balance task, add balance tasks manually
Config.disable_balance = true;
// Create a new scheduler & checker for redundant tablets handling
// Call runAfterCatalogReady manually instead of starting daemon thread
TabletSchedulerStat stat = new TabletSchedulerStat();
PartitionRebalancer rebalancer = new PartitionRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
TabletScheduler tabletScheduler = new TabletScheduler(catalog, systemInfoService, invertedIndex, stat, "");
// The rebalancer inside the scheduler will use this rebalancer, for getToDeleteReplicaId
Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
TabletChecker tabletChecker = new TabletChecker(catalog, systemInfoService, tabletScheduler, stat);
rebalancer.updateLoadStatistic(statisticMap);
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
// Run once for update slots info, scheduler won't select balance cuz balance is disabled
tabletScheduler.runAfterCatalogReady();
AgentBatchTask batchTask = new AgentBatchTask();
for (TabletSchedCtx tabletCtx : alternativeTablets) {
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
try {
tabletCtx.setStorageMedium(TStorageMedium.HDD);
tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId()));
tabletCtx.setVersionInfo(1, 0, 1, 0);
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
// createCloneReplicaAndTask, create replica will change invertedIndex too.
rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask);
} catch (SchedException e) {
LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
}
}
// Show debug info of MoveInProgressMap detail
rebalancer.updateLoadStatistic(statisticMap);
rebalancer.selectAlternativeTablets();
// Get created tasks, and finish them manually
List<AgentTask> tasks = batchTask.getAllTasks();
List<Long> needCheckTablets = tasks.stream().map(AgentTask::getTabletId).collect(Collectors.toList());
LOG.info("created tasks for tablet: {}", needCheckTablets);
needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
// // If clone task execution is too slow, tabletChecker may want to delete the CLONE replica.
// tabletChecker.runAfterCatalogReady();
// Assert.assertTrue(tabletScheduler.containsTablet(50000));
// // tabletScheduler handle redundant
// tabletScheduler.runAfterCatalogReady();
for (Long tabletId : needCheckTablets) {
TabletSchedCtx tabletSchedCtx = alternativeTablets.stream().filter(ctx -> ctx.getTabletId() == tabletId).collect(onlyElement());
AgentTask task = tasks.stream().filter(t -> t.getTabletId() == tabletId).collect(onlyElement());
LOG.info("try to finish tabletCtx {}", tabletId);
try {
TFinishTaskRequest fakeReq = new TFinishTaskRequest();
fakeReq.task_status = new TStatus(TStatusCode.OK);
fakeReq.finish_tablet_infos = Lists.newArrayList(new TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
} catch (SchedException e) {
e.printStackTrace();
}
}
// NeedCheckTablets are redundant, TabletChecker will add them to TabletScheduler
tabletChecker.runAfterCatalogReady();
needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
needCheckTablets.forEach(t -> Assert.assertTrue(tabletScheduler.containsTablet(t)));
// TabletScheduler handle redundant tablet
tabletScheduler.runAfterCatalogReady();
// One replica is set to DECOMMISSION, still 4 replicas
needCheckTablets.forEach(t -> {
List<Replica> replicas = invertedIndex.getReplicasByTabletId(t);
Assert.assertEquals(4, replicas.size());
Replica decommissionedReplica = replicas.stream().filter(r -> r.getState() == Replica.ReplicaState.DECOMMISSION).collect(onlyElement());
// expected watermarkTxnId is 111
Assert.assertEquals(111, decommissionedReplica.getWatermarkTxnId());
});
// Delete replica should change invertedIndex too
tabletScheduler.runAfterCatalogReady();
needCheckTablets.forEach(t -> Assert.assertEquals(3, invertedIndex.getReplicasByTabletId(t).size()));
// Check moves completed
rebalancer.selectAlternativeTablets();
rebalancer.updateLoadStatistic(statisticMap);
AtomicLong succeeded = Deencapsulation.getField(rebalancer, "counterBalanceMoveSucceeded");
Assert.assertEquals(needCheckTablets.size(), succeeded.get());
}
@Test
public void testMoveInProgressMap() {
Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache", Level.DEBUG);
MovesCacheMap m = new MovesCacheMap();
m.updateMapping(statisticMap, 3);
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD).get().put(1L, new Pair<>(null, -1L));
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(2L, new Pair<>(null, -1L));
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
// Maintenance won't clean up the entries of cache
m.maintain();
Assert.assertEquals(3, m.size());
// Reset the expireAfterAccess, the whole cache map will be cleared.
m.updateMapping(statisticMap, 1);
Assert.assertEquals(0, m.size());
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
try {
Thread.sleep(1000);
m.maintain();
Assert.assertEquals(0, m.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
public class RebalancerTestUtil {
// Add only one path, PathHash:id
public static Backend createBackend(long id, long totalCap, long usedCap) {
// ip:port won't be checked
Backend be = new Backend(id, "192.168.0." + id, 9051);
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo = new DiskInfo("/path1");
diskInfo.setPathHash(id);
diskInfo.setTotalCapacityB(totalCap);
diskInfo.setDataUsedCapacityB(usedCap);
disks.put(diskInfo.getRootPath(), diskInfo);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
return be;
}
// Create one tablet(and its replicas) for one partition. The replicas will created on backends which are numbered in beIds.
// The tablet will be added to TabletInvertedIndex & OlapTable.
// Only use the partition's baseIndex for simplicity
public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
int tabletId, List<Long> beIds) {
Partition partition = olapTable.getPartition(partitionName);
MaterializedIndex baseIndex = partition.getBaseIndex();
int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partition.getId(), baseIndex.getId(),
schemaHash, medium);
Tablet tablet = new Tablet(tabletId);
// add tablet to olapTable
olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, tabletMeta);
createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds);
}
// Create replicas on backends which are numbered in beIds.
// The tablet & replicas will be added to invertedIndex.
public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, Tablet tablet, List<Long> beIds) {
invertedIndex.addTablet(tablet.getId(), tabletMeta);
IntStream.range(0, beIds.size()).forEach(i -> {
Replica replica = new Replica(tablet.getId() + i, beIds.get(i), Replica.ReplicaState.NORMAL, 1, 0, tabletMeta.getOldSchemaHash());
// We've set pathHash to beId for simplicity
replica.setPathHash(beIds.get(i));
// isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex
tablet.addReplica(replica, true);
invertedIndex.addReplica(tablet.getId(), replica);
});
}
}

View File

@ -0,0 +1,299 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
import org.apache.doris.clone.TwoDimensionalGreedyRebalanceAlgo.PartitionMove;
import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
import org.apache.doris.common.Pair;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
public class TwoDimensionalGreedyRebalanceAlgoTest {
private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgoTest.class);
TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo(TwoDimensionalGreedyRebalanceAlgo.EqualSkewOption.PICK_FIRST);
// Structure to describe rebalancing-related state of the cluster expressively
// enough for the tests.
private static class TestClusterConfig {
static class PartitionPerBeReplicas {
Long partitionId;
Long indexId;
// Number of replicas of this partition on each server in the cluster.
// By definition, the indices in this container correspond to indices
// in TestClusterConfig::beIds.
List<Long> numReplicasByServer;
PartitionPerBeReplicas(Long p, Long i, List<Long> l) {
this.partitionId = p;
this.indexId = i;
this.numReplicasByServer = l;
}
}
// IDs of bes; every element must be unique.
List<Long> beIds = Lists.newArrayList();
// Distribution of partition replicas across the bes. The following
// constraints should be in place:
// * for each p in partitionReplicas:
// p.numReplicasByServer.size() == beIds.size()
List<PartitionPerBeReplicas> partitionReplicas = Lists.newArrayList();
// The expected replica movements: the reference output of the algorithm
// to compare with.
List<PartitionMove> expectedMoves = Lists.newArrayList();
// TODO MovesOrderingComparison: Options controlling how the reference and the actual results are compared.
// PartitionBalanceInfos in skew map are arbitrary ordering, so we can't get the fixed moves
// when more than one partition have the maxSkew.
}
// Transform the definition of the test cluster into the ClusterInfo
// that is consumed by the rebalancing algorithm.
private ClusterBalanceInfo ClusterConfigToClusterBalanceInfo(TestClusterConfig tcc) {
// First verify that the configuration of the test cluster is valid.
Set<Pair<Long, Long>> partitionIds = Sets.newHashSet();
for (TestClusterConfig.PartitionPerBeReplicas p : tcc.partitionReplicas) {
Assert.assertEquals(tcc.beIds.size(), p.numReplicasByServer.size());
partitionIds.add(new Pair<>(p.partitionId, p.indexId));
}
Assert.assertEquals(partitionIds.size(), tcc.partitionReplicas.size());
// Check for uniqueness of the tablet servers' identifiers.
Set<Long> beIdSet = new HashSet<>(tcc.beIds);
Assert.assertEquals(tcc.beIds.size(), beIdSet.size());
ClusterBalanceInfo balance = new ClusterBalanceInfo();
for (int beIdx = 0; beIdx < tcc.beIds.size(); ++beIdx) {
// Total replica count at the tablet server.
long count = 0;
for (TestClusterConfig.PartitionPerBeReplicas p : tcc.partitionReplicas) {
count += p.numReplicasByServer.get(beIdx);
}
balance.beByTotalReplicaCount.put(count, tcc.beIds.get(beIdx));
}
for (int pIdx = 0; pIdx < tcc.partitionReplicas.size(); ++pIdx) {
// Replicas of the current partition per be.
TestClusterConfig.PartitionPerBeReplicas distribution = tcc.partitionReplicas.get(pIdx);
PartitionBalanceInfo info = new PartitionBalanceInfo(distribution.partitionId, distribution.indexId);
List<Long> replicaCount = distribution.numReplicasByServer;
IntStream.range(0, replicaCount.size()).forEach(i -> info.beByReplicaCount.put(replicaCount.get(i), tcc.beIds.get(i)));
Long max_count = info.beByReplicaCount.keySet().last();
Long min_count = info.beByReplicaCount.keySet().first();
Assert.assertTrue(max_count >= min_count);
balance.partitionInfoBySkew.put(max_count - min_count, info);
}
return balance;
}
private void verifyMoves(List<TestClusterConfig> configs) {
for (TestClusterConfig config : configs) {
List<PartitionMove> moves = algo.getNextMoves(ClusterConfigToClusterBalanceInfo(config), 0);
Assert.assertEquals(moves, config.expectedMoves);
}
}
@Before
public void setUp() {
Configurator.setLevel("org.apache.doris.clone.TwoDimensionalGreedyAlgo", Level.WARN);
}
@Test
public void testApplyMoveFailed() {
PartitionMove move = new PartitionMove(11L, 22L, 10001L, 10002L);
// total count is valid
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
beByTotalReplicaCount.put(10L, 10001L);
beByTotalReplicaCount.put(10L, 10002L);
// no info of partition
TreeMultimap<Long, PartitionBalanceInfo> skewMap = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
try {
TwoDimensionalGreedyRebalanceAlgo.applyMove(move, beByTotalReplicaCount, skewMap);
} catch (Exception e) {
Assert.assertSame(e.getClass(), IllegalStateException.class);
LOG.info(e.getMessage());
}
// beByTotalReplicaCount should be modified
Assert.assertEquals(0, beByTotalReplicaCount.keySet().stream().filter(skew -> skew != 10L).count());
// invalid info of partition
skewMap.put(6L, new PartitionBalanceInfo(11L, 22L));
try {
TwoDimensionalGreedyRebalanceAlgo.applyMove(move, beByTotalReplicaCount, skewMap);
} catch (Exception e) {
Assert.assertSame(e.getClass(), IllegalStateException.class);
LOG.warn(e.getMessage());
}
// beByTotalReplicaCount should be modified
Assert.assertEquals(0, beByTotalReplicaCount.keySet().stream().filter(skew -> skew != 10L).count());
}
@Test
public void testInvalidClusterBalanceInfo() {
Configurator.setLevel("org.apache.doris.clone.TwoDimensionalGreedyAlgo", Level.DEBUG);
try {
algo.getNextMoves(new ClusterBalanceInfo(), 0);
} catch (Exception e) {
Assert.fail();
}
try {
algo.getNextMoves(new ClusterBalanceInfo() {{
beByTotalReplicaCount.put(0L, 10001L);
}}, 0);
} catch (Exception e) {
Assert.fail();
}
try {
// Invalid balance info will cause IllegalStateException
algo.getNextMoves(new ClusterBalanceInfo() {
{
beByTotalReplicaCount.put(0L, 10001L);
beByTotalReplicaCount.put(1L, 10002L);
}
}, 0);
Assert.fail("Exception will be thrown in GetNextMoves");
} catch (Exception e) {
Assert.assertSame(e.getClass(), IllegalStateException.class);
LOG.info(e.getMessage());
}
}
// Partition- and cluster-wise balanced configuration with one-off skew.
// Algorithm won't consider about the tablet health
@Test
public void testAlreadyBalanced() {
List<TestClusterConfig> configs = Lists.newArrayList(
// A single be with a single replica of the only partition.
new TestClusterConfig() {{
beIds.add(10001L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(1L)));
// expectedMoves is empty
}},
// A single be in the cluster that hosts all replicas.
new TestClusterConfig() {{
beIds.add(10001L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(1L)));
partitionReplicas.add(new PartitionPerBeReplicas(22L, 44L, Lists.newArrayList(10L)));
partitionReplicas.add(new PartitionPerBeReplicas(22L, 55L, Lists.newArrayList(10L)));
}},
// Single partition and 2 be: 100 and 99 replicas at each.
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(100L, 99L)));
}}
);
verifyMoves(configs);
}
// TODO after MovesOrderingComparison supported
// Set of scenarios where the distribution of replicas is partition-wise balanced
// but not yet cluster-wise balanced, requiring just a few replica moves
// to achieve both partition- and cluster-wise balance state.
// TODO add more tests after MovesOrderingComparison supported
// Set of scenarios where the distribution of table replicas is cluster-wise
// balanced, but not table-wise balanced, requiring just few moves to make it
// both table- and cluster-wise balanced.
@Test
public void testClusterWiseBalanced() {
List<TestClusterConfig> configs = Lists.newArrayList(
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(2L, 0L)));
partitionReplicas.add(new PartitionPerBeReplicas(22L, 44L, Lists.newArrayList(1L, 2L)));
expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
}}
);
verifyMoves(configs);
}
// Unbalanced (both table- and cluster-wise) and simple enough configurations
// to make them balanced moving just few replicas.
@Test
public void testFewMoves() {
List<TestClusterConfig> configs = Lists.newArrayList(
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(2L, 0L)));
expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
}},
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(3L, 0L)));
expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
}},
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(4L, 0L)));
expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
}}
);
verifyMoves(configs);
}
// Unbalanced (both table- and cluster-wise) and simple enough configurations to
// make them balanced moving many replicas around.
@Test
public void testManyMoves() {
List<TestClusterConfig> configs = Lists.newArrayList(
new TestClusterConfig() {{
beIds.add(10001L);
beIds.add(10002L);
beIds.add(10003L);
partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(100L, 400L, 100L)));
for (int i = 0; i < 200; i++) {
if (i % 2 == 1) {
expectedMoves.add(new PartitionMove(22L, 33L, 10002L, 10003L));
} else {
expectedMoves.add(new PartitionMove(22L, 33L, 10002L, 10001L));
}
}
}}
);
verifyMoves(configs);
}
}