[BUG] Fix Colocate table balance bug (#4936)
Fix bug that colocation group is always in unstable status.
This commit is contained in:
@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
@ -44,7 +43,6 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -74,241 +72,95 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
|
||||
@Override
|
||||
/*
|
||||
* Each round, we do 3 steps:
|
||||
* 1. Relocate group:
|
||||
* Each round, we do 2 steps:
|
||||
* 1. Relocate and balance group:
|
||||
* Backend is not available, find a new backend to replace it.
|
||||
* Relocate at most one bucket in one group at a time.
|
||||
*
|
||||
* and after all unavailable has been replaced, balance the group
|
||||
*
|
||||
* 2. Match group:
|
||||
* If replica mismatch backends in a group, that group will be marked as unstable, and pass that
|
||||
* tablet to TabletScheduler.
|
||||
* Otherwise, mark the group as stable
|
||||
*
|
||||
* 3. Balance group:
|
||||
* Try balance group, and skip groups which contains unavailable backends.
|
||||
*/
|
||||
protected void runAfterCatalogReady() {
|
||||
relocateGroup();
|
||||
relocateAndBalanceGroup();
|
||||
matchGroup();
|
||||
balanceGroup();
|
||||
}
|
||||
|
||||
/*
|
||||
* Traverse all colocate groups, for each group:
|
||||
* Check if there are backends dead or unavailable(decommission)
|
||||
* If yes, for each buckets in this group, if the unavailable backend belongs to this bucket, we will find
|
||||
* a new backend to replace the unavailable one.
|
||||
*
|
||||
* eg:
|
||||
* original bucket backends sequence is:
|
||||
* [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]]
|
||||
*
|
||||
* and backend 3 is dead, so we will find an available backend(eg. backend 4) to replace backend 3.
|
||||
* [[1, 2, 4], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]]
|
||||
*
|
||||
* NOTICE that in this example, we only replace the #3 backend in first bucket. That is, we only replace
|
||||
* one bucket in one group at each round. because we need to use newly-updated cluster load statistic to
|
||||
* find next available backend. and cluster load statistic is updated every 20 seconds.
|
||||
* relocate and balance group
|
||||
* here we just let replicas in colocate table evenly distributed in cluster, not consider the
|
||||
* cluster load statistic.
|
||||
* for example:
|
||||
* currently there are 4 backends A B C D with following load:
|
||||
*
|
||||
* +-+
|
||||
* | |
|
||||
* +-+ +-+ +-+ | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*
|
||||
* And colocate group balancer will still evenly distribute the replicas to all 4 backends, not
|
||||
* just 3 low load backends.
|
||||
*
|
||||
* X
|
||||
* X
|
||||
* X X X +-+
|
||||
* X X X | |
|
||||
* +-+ +-+ +-+ | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*
|
||||
* So After colocate balance, the cluster may still 'unbalanced' from a global perspective.
|
||||
* And the LoadBalancer will balance the non-colocate table's replicas to make the
|
||||
* cluster balance, eventually.
|
||||
*
|
||||
* X X X X
|
||||
* X X X X
|
||||
* +-+ +-+ +-+ +-+
|
||||
* | | | | | | | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*/
|
||||
private void relocateGroup() {
|
||||
if (Config.disable_colocate_relocate) {
|
||||
private void relocateAndBalanceGroup() {
|
||||
if (Config.disable_colocate_balance) {
|
||||
return;
|
||||
}
|
||||
|
||||
Catalog catalog = Catalog.getCurrentCatalog();
|
||||
ColocateTableIndex colocateIndex = catalog.getColocateTableIndex();
|
||||
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
|
||||
Map<String, ClusterLoadStatistic> statisticMap = Catalog.getCurrentCatalog().getTabletScheduler().getStatisticMap();
|
||||
long currTime = System.currentTimeMillis();
|
||||
|
||||
|
||||
// get all groups
|
||||
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
|
||||
for (GroupId groupId : groupIds) {
|
||||
// get all backends in this group
|
||||
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
|
||||
long unavailableBeId = -1;
|
||||
for (Long backendId : backends) {
|
||||
// find an unavailable backend. even if there are than one unavailable backend,
|
||||
// we just handle the first one.
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
unavailableBeId = backendId;
|
||||
break;
|
||||
} else if (!be.isAvailable()) {
|
||||
// 1. BE is dead for a long time
|
||||
// 2. BE is under decommission
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|
||||
|| be.isDecommissioned()) {
|
||||
unavailableBeId = backendId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (unavailableBeId == -1) {
|
||||
// all backends in this group are available.
|
||||
// But in previous version we had a bug that replicas of a tablet may be located on same host.
|
||||
// we have to check it.
|
||||
List<List<Long>> backendsPerBucketsSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
|
||||
OUT: for (List<Long> backendIds : backendsPerBucketsSeq) {
|
||||
Set<String> hosts = Sets.newHashSet();
|
||||
for (Long beId : backendIds) {
|
||||
Backend be = infoService.getBackend(beId);
|
||||
if (be == null) {
|
||||
// backend can be dropped any time, just skip this bucket
|
||||
break;
|
||||
}
|
||||
if (!hosts.add(be.getHost())) {
|
||||
// find replicas on same host. simply mark this backend as unavailable,
|
||||
// so that following step will find another backend
|
||||
unavailableBeId = beId;
|
||||
break OUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (unavailableBeId == -1) {
|
||||
// if everything is ok, continue
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// find the first bucket which contains the unavailable backend
|
||||
LOG.info("backend {} is unavailable in colocate group {}", unavailableBeId, groupId);
|
||||
List<Set<Long>> bucketBackendsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
|
||||
int tabletOrderIdx = 0;
|
||||
for (Set<Long> set : bucketBackendsSeq) {
|
||||
if (set.contains(unavailableBeId)) {
|
||||
break;
|
||||
}
|
||||
tabletOrderIdx++;
|
||||
}
|
||||
|
||||
// select a new backend to replace the unavailable one
|
||||
long newBackendId = selectSubstituteBackend(tabletOrderIdx, groupId, unavailableBeId,
|
||||
bucketBackendsSeq.get(tabletOrderIdx), statisticMap);
|
||||
if (newBackendId != -1) {
|
||||
// replace backend
|
||||
bucketBackendsSeq.get(tabletOrderIdx).remove(unavailableBeId);
|
||||
bucketBackendsSeq.get(tabletOrderIdx).add(newBackendId);
|
||||
colocateIndex.setBackendsSetByIdxForGroup(groupId, tabletOrderIdx, bucketBackendsSeq.get(tabletOrderIdx));
|
||||
LOG.info("select backend {} to replace backend {} for bucket {} in group {}. now backends set is: {}",
|
||||
newBackendId, unavailableBeId, tabletOrderIdx, groupId, bucketBackendsSeq.get(tabletOrderIdx));
|
||||
}
|
||||
|
||||
// only handle one backend at a time
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Select a substitute backend for specified bucket and colocate group.
|
||||
* return -1 if backend not found.
|
||||
* we need to move all replicas of this bucket to the new backend, so we have to check if the new
|
||||
* backend can save all these replicas.
|
||||
*/
|
||||
private long selectSubstituteBackend(int tabletOrderIdx, GroupId groupId, long unavailableBeId,
|
||||
Set<Long> excludeBeIds, Map<String, ClusterLoadStatistic> statisticMap) {
|
||||
ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
|
||||
Database db = Catalog.getCurrentCatalog().getDb(groupId.dbId);
|
||||
if (db == null) {
|
||||
LOG.info("db {} does not exist", groupId.dbId);
|
||||
return -1;
|
||||
}
|
||||
ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName());
|
||||
if (statistic == null) {
|
||||
LOG.info("cluster {} statistic does not exist", db.getClusterName());
|
||||
return -1;
|
||||
}
|
||||
|
||||
// calculate the total replica size of this bucket
|
||||
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
|
||||
long totalReplicaNum = 0;
|
||||
long totalReplicaSize = 0;
|
||||
db.readLock();
|
||||
try {
|
||||
for (Long tblId : tableIds) {
|
||||
OlapTable tbl = (OlapTable) db.getTable(tblId);
|
||||
if (tbl == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Partition partition : tbl.getPartitions()) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
long tabletId = index.getTabletIdsInOrder().get(tabletOrderIdx);
|
||||
Tablet tablet = index.getTablet(tabletId);
|
||||
Replica replica = tablet.getReplicaByBackendId(unavailableBeId);
|
||||
if (replica != null) {
|
||||
totalReplicaNum++;
|
||||
totalReplicaSize += replica.getDataSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
LOG.debug("the number and size of replicas on backend {} of bucket {} is: {} and {}",
|
||||
unavailableBeId, tabletOrderIdx, totalReplicaNum, totalReplicaSize);
|
||||
|
||||
/*
|
||||
* There is an unsolved problem of finding a new backend for data migration:
|
||||
* Different table(partition) in this group may in different storage medium(SSD or HDD). If one backend
|
||||
* is down, the best solution is to find a backend which has both SSD and HDD, and replicas can be
|
||||
* relocated in corresponding storage medium.
|
||||
* But in fact, backends can be heterogeneous, which may only has SSD or HDD. If we choose to strictly
|
||||
* find backends with expecting storage medium, this may lead to a consequence that most of replicas
|
||||
* are gathered in a small portion of backends.
|
||||
*
|
||||
* So for simplicity, we ignore the storage medium property, just find a low load backend which has
|
||||
* capacity to save these replicas.
|
||||
*/
|
||||
List<BackendLoadStatistic> beStats = statistic.getSortedBeLoadStats(null /* mix medium */);
|
||||
if (beStats.isEmpty()) {
|
||||
LOG.warn("failed to relocate backend for colocate group: {}, no backends found", groupId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// the selected backend should not be on same host of other backends of this bucket.
|
||||
// here we generate a host set for further checking.
|
||||
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
|
||||
Set<String> excludeHosts = Sets.newHashSet();
|
||||
for (Long excludeBeId : excludeBeIds) {
|
||||
Backend be = infoService.getBackend(excludeBeId);
|
||||
if (be == null) {
|
||||
LOG.info("Backend {} has been dropped when finding backend for colocate group {}", excludeBeId, groupId);
|
||||
Database db = catalog.getDb(groupId.dbId);
|
||||
if (db == null) {
|
||||
continue;
|
||||
}
|
||||
excludeHosts.add(be.getHost());
|
||||
}
|
||||
Preconditions.checkState(excludeBeIds.size() >= excludeHosts.size());
|
||||
|
||||
// beStats is ordered by load score, ascend. so finding the available from first to last
|
||||
BackendLoadStatistic chosenBe = null;
|
||||
for (BackendLoadStatistic beStat : beStats) {
|
||||
if (beStat.isAvailable() && beStat.getBeId() != unavailableBeId && !excludeBeIds.contains(beStat.getBeId())) {
|
||||
Backend be = infoService.getBackend(beStat.getBeId());
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
if (excludeHosts.contains(be.getHost())) {
|
||||
continue;
|
||||
}
|
||||
chosenBe = beStat;
|
||||
break;
|
||||
Map<String, ClusterLoadStatistic> statisticMap = catalog.getTabletScheduler().getStatisticMap();
|
||||
if (statisticMap == null) {
|
||||
continue;
|
||||
}
|
||||
ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName());
|
||||
if (statistic == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Set<Long> unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
|
||||
List<Long> availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds);
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
|
||||
colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
catalog.getEditLog().logColocateBackendsPerBucketSeq(info);
|
||||
LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq);
|
||||
}
|
||||
}
|
||||
if (chosenBe == null) {
|
||||
LOG.warn("failed to find an available backend to relocate for colocate group: {}", groupId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// check if there is enough capacity to save all these replicas
|
||||
if (!chosenBe.canFitInColocate(totalReplicaSize)) {
|
||||
LOG.warn("no backend has enough capacity to save replicas in group {} with bucket: {}", groupId, tabletOrderIdx);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return chosenBe.getBeId();
|
||||
}
|
||||
|
||||
/*
|
||||
@ -395,93 +247,6 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
} // end for groups
|
||||
}
|
||||
|
||||
/*
|
||||
* Balance colocate groups which are unstable
|
||||
* here we just let replicas in colocate table evenly distributed in cluster, not consider the
|
||||
* cluster load statistic.
|
||||
* for example:
|
||||
* currently there are 4 backends A B C D with following load:
|
||||
*
|
||||
* +-+
|
||||
* | |
|
||||
* +-+ +-+ +-+ | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*
|
||||
* And colocate group balancer will still evenly distribute the replicas to all 4 backends, not
|
||||
* just 3 low load backends.
|
||||
*
|
||||
* X
|
||||
* X
|
||||
* X X X +-+
|
||||
* X X X | |
|
||||
* +-+ +-+ +-+ | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*
|
||||
* So After colocate balance, the cluster may still 'unbalanced' from a global perspective.
|
||||
* And the LoadBalancer will balance the non-colocate table's replicas to make the
|
||||
* cluster balance, eventually.
|
||||
*
|
||||
* X X X X
|
||||
* X X X X
|
||||
* +-+ +-+ +-+ +-+
|
||||
* | | | | | | | |
|
||||
* | | | | | | | |
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*/
|
||||
private void balanceGroup() {
|
||||
if (Config.disable_colocate_balance) {
|
||||
return;
|
||||
}
|
||||
Catalog catalog = Catalog.getCurrentCatalog();
|
||||
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
|
||||
ColocateTableIndex colocateIndex = catalog.getColocateTableIndex();
|
||||
|
||||
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
|
||||
for (GroupId groupId : groupIds) {
|
||||
// skip unstable groups
|
||||
if (colocateIndex.isGroupUnstable(groupId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// skip backend unavailable groups
|
||||
Set<Long> backendIds = colocateIndex.getBackendsByGroup(groupId);
|
||||
boolean isAllBackendsAvailable = true;
|
||||
for (Long beId : backendIds) {
|
||||
Backend be = infoService.getBackend(beId);
|
||||
if (be == null || !be.isAvailable()) {
|
||||
isAllBackendsAvailable = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!isAllBackendsAvailable) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// all backends are good
|
||||
Database db = catalog.getDb(groupId.dbId);
|
||||
if (db == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Long> allBackendIds = infoService.getClusterBackendIds(db.getClusterName(), true);
|
||||
List<Long> allBackendIdsAvailable = allBackendIds.stream()
|
||||
.filter(infoService::checkBackendAvailable)
|
||||
.collect(Collectors.toList());
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
if (balance(groupId, allBackendIdsAvailable, colocateIndex, infoService, balancedBackendsPerBucketSeq)) {
|
||||
colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info);
|
||||
LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* The balance logic is as follow:
|
||||
*
|
||||
@ -508,7 +273,7 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
* Algorithm:
|
||||
* 0. Generate the flat list of backends per bucket sequence:
|
||||
* A B C A D E A F G A H I
|
||||
* 1. Sort the backend in this order by replication num, descending:
|
||||
* 1. Sort backends order by replication num and load score for same replication num backends, descending:
|
||||
* A B C D E F G H I J
|
||||
* 2. Check the diff of the first backend(A)'s replica num and last backend(J)'s replica num.
|
||||
* If diff is less or equal than 1, we consider this group as balance. Jump to step 5.
|
||||
@ -520,12 +285,16 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
* Partition this flat list by replication num:
|
||||
* [J B C] [J D E] [A F G] [A H I]
|
||||
* And this is our new balanced backends per bucket sequence.
|
||||
*
|
||||
*
|
||||
* relocate is similar to balance, but choosing unavailable be as src, and move all bucketIds on unavailable be to
|
||||
* low be
|
||||
*
|
||||
* Return true if backends per bucket sequence change and new sequence is saved in balancedBackendsPerBucketSeq.
|
||||
* Return false if nothing changed.
|
||||
*/
|
||||
private boolean balance(GroupId groupId, List<Long> allAvailBackendIds, ColocateTableIndex colocateIndex,
|
||||
SystemInfoService infoService, List<List<Long>> balancedBackendsPerBucketSeq) {
|
||||
private boolean relocateAndBalance(GroupId groupId, Set<Long> unavailableBeIds, List<Long> availableBeIds,
|
||||
ColocateTableIndex colocateIndex, SystemInfoService infoService,
|
||||
ClusterLoadStatistic statistic, List<List<Long>> balancedBackendsPerBucketSeq) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
int replicationNum = groupSchema.getReplicationNum();
|
||||
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(colocateIndex.getBackendsPerBucketSeq(groupId));
|
||||
@ -543,29 +312,49 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
}
|
||||
Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size());
|
||||
|
||||
// sort backends with replica num
|
||||
long srcBeId = -1;
|
||||
List<Integer> seqIndexes = null;
|
||||
boolean hasUnavailableBe = false;
|
||||
// first choose the unavailable be as src be
|
||||
for (Long beId : unavailableBeIds) {
|
||||
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
|
||||
if (seqIndexes.size() > 0) {
|
||||
srcBeId = beId;
|
||||
hasUnavailableBe = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// sort backends with replica num in desc order
|
||||
List<Map.Entry<Long, Long>> backendWithReplicaNum =
|
||||
getSortedBackendReplicaNumPairs(allAvailBackendIds, flatBackendsPerBucketSeq);
|
||||
// if there is only one available backend, end the outer loop
|
||||
if (backendWithReplicaNum.size() == 1) {
|
||||
LOG.info("there is only one available backend, end the outer loop in colocate group {}", groupId);
|
||||
break;
|
||||
getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, flatBackendsPerBucketSeq);
|
||||
if (seqIndexes == null || seqIndexes.size() <= 0) {
|
||||
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
|
||||
if (backendWithReplicaNum.size() <= 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
// choose max bucketId num be as src be
|
||||
srcBeId = backendWithReplicaNum.get(0).getKey();
|
||||
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
int i;
|
||||
if (hasUnavailableBe) {
|
||||
i = -1;
|
||||
} else {
|
||||
i = 0;
|
||||
}
|
||||
int j = backendWithReplicaNum.size() - 1;
|
||||
while (i < j) {
|
||||
boolean isThisRoundChanged = false;
|
||||
// we try to use a low backend to replace the high backend.
|
||||
// we try to use a low backend to replace the src backend.
|
||||
// if replace failed(eg: both backends are on some host), select next low backend and try(j--)
|
||||
Map.Entry<Long, Long> highBackend = backendWithReplicaNum.get(i);
|
||||
Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
|
||||
if (highBackend.getValue() - lowBackend.getValue() <= 1) {
|
||||
if ((!hasUnavailableBe) && (seqIndexes.size() - lowBackend.getValue()) <= 1) {
|
||||
// balanced
|
||||
break OUT;
|
||||
}
|
||||
|
||||
long srcBeId = highBackend.getKey();
|
||||
long destBeId = lowBackend.getKey();
|
||||
Backend destBe = infoService.getBackend(destBeId);
|
||||
if (destBe == null) {
|
||||
@ -573,18 +362,6 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* get the array indexes of elements in flatBackendsPerBucketSeq which equals to srcBeId
|
||||
* eg:
|
||||
* flatBackendsPerBucketSeq:
|
||||
* A B C A D E A F G A H I
|
||||
* and srcBeId is A.
|
||||
* so seqIndexes is:
|
||||
* 0 3 6 9
|
||||
*/
|
||||
List<Integer> seqIndexes = IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter(
|
||||
idx -> flatBackendsPerBucketSeq.get(idx).equals(srcBeId)).collect(Collectors.toList());
|
||||
|
||||
for (int seqIndex : seqIndexes) {
|
||||
// the bucket index.
|
||||
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
|
||||
@ -614,7 +391,7 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
"end outer loop in colocate group {}", groupId);
|
||||
break OUT;
|
||||
} else {
|
||||
// select another load backend and try again
|
||||
// select another low backend and try again
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -649,19 +426,85 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
return hostsPerBucketSeq;
|
||||
}
|
||||
|
||||
private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
|
||||
List<Long> flatBackendsPerBucketSeq) {
|
||||
private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds, Set<Long> unavailBackendIds,
|
||||
ClusterLoadStatistic statistic, List<Long> flatBackendsPerBucketSeq) {
|
||||
// backend id -> replica num, and sorted by replica num, descending.
|
||||
Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
|
||||
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||
// remove unavailable backend
|
||||
for (Long backendId : unavailBackendIds) {
|
||||
backendToReplicaNum.remove(backendId);
|
||||
}
|
||||
// add backends which are not in flatBackendsPerBucketSeq, with replication number 0
|
||||
for (Long backendId : allAvailBackendIds) {
|
||||
if (!backendToReplicaNum.containsKey(backendId)) {
|
||||
backendToReplicaNum.put(backendId, 0L);
|
||||
}
|
||||
}
|
||||
List<Map.Entry<Long, Long>> backendWithReplicaNum = backendToReplicaNum.entrySet().stream().sorted(
|
||||
Collections.reverseOrder(Map.Entry.comparingByValue())).collect(Collectors.toList());
|
||||
return backendWithReplicaNum;
|
||||
|
||||
return backendToReplicaNum
|
||||
.entrySet()
|
||||
.stream()
|
||||
.sorted((entry1, entry2) -> {
|
||||
if (!entry1.getValue().equals(entry2.getValue())) {
|
||||
return (int)(entry2.getValue() - entry1.getValue());
|
||||
}
|
||||
BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey());
|
||||
BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey());
|
||||
if (beStat1 == null || beStat2 == null) {
|
||||
return 0;
|
||||
}
|
||||
double loadScore1 = beStat1.getMixLoadScore();
|
||||
double loadScore2 = beStat2.getMixLoadScore();
|
||||
if (Math.abs(loadScore1 - loadScore2) < 1e-6) {
|
||||
return 0;
|
||||
} else if (loadScore2 > loadScore1) {
|
||||
return 1;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/*
|
||||
* get the array indexes of elements in flatBackendsPerBucketSeq which equals to beId
|
||||
* eg:
|
||||
* flatBackendsPerBucketSeq:
|
||||
* A B C A D E A F G A H I
|
||||
* and srcBeId is A.
|
||||
* so seqIndexes is:
|
||||
* 0 3 6 9
|
||||
*/
|
||||
private List<Integer> getBeSeqIndexes(List<Long> flatBackendsPerBucketSeq, long beId) {
|
||||
return IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter(
|
||||
idx -> flatBackendsPerBucketSeq.get(idx).equals(beId)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) {
|
||||
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
|
||||
Set<Long> unavailableBeIds = Sets.newHashSet();
|
||||
long currTime = System.currentTimeMillis();
|
||||
for (Long backendId : backends) {
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
unavailableBeIds.add(backendId);
|
||||
} else if (!be.isAvailable()) {
|
||||
// 1. BE is dead for a long time
|
||||
// 2. BE is under decommission
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|
||||
|| be.isDecommissioned()) {
|
||||
unavailableBeIds.add(backendId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return unavailableBeIds;
|
||||
}
|
||||
|
||||
private List<Long> getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set<Long> unavailableBeIds) {
|
||||
List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, true);
|
||||
return allBackendIds.stream()
|
||||
.filter(id -> !unavailableBeIds.contains(id))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,12 +17,15 @@
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import com.google.common.collect.Sets;
|
||||
import mockit.Delegate;
|
||||
import org.apache.doris.catalog.ColocateGroupSchema;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
@ -34,23 +37,17 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class ColocateTableBalancerTest {
|
||||
|
||||
@Mocked
|
||||
private Catalog catalog;
|
||||
@Mocked
|
||||
private SystemInfoService infoService;
|
||||
|
||||
private TabletScheduler tabletScheduler;
|
||||
|
||||
private ColocateTableBalancer balancer = ColocateTableBalancer.getInstance();
|
||||
|
||||
|
||||
private Backend backend1;
|
||||
private Backend backend2;
|
||||
private Backend backend3;
|
||||
@ -61,6 +58,8 @@ public class ColocateTableBalancerTest {
|
||||
private Backend backend8;
|
||||
private Backend backend9;
|
||||
|
||||
private Map<Long, Double> mixLoadScores;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
backend1 = new Backend(1L, "192.168.1.1", 9050);
|
||||
@ -74,6 +73,29 @@ public class ColocateTableBalancerTest {
|
||||
backend8 = new Backend(8L, "192.168.1.8", 9050);
|
||||
backend9 = new Backend(9L, "192.168.1.8", 9050);
|
||||
|
||||
mixLoadScores = Maps.newHashMap();
|
||||
mixLoadScores.put(1L, 0.1);
|
||||
mixLoadScores.put(2L, 0.5);
|
||||
mixLoadScores.put(3L, 0.4);
|
||||
mixLoadScores.put(4L, 0.2);
|
||||
mixLoadScores.put(5L, 0.3);
|
||||
mixLoadScores.put(6L, 0.6);
|
||||
mixLoadScores.put(7L, 0.8);
|
||||
mixLoadScores.put(8L, 0.7);
|
||||
mixLoadScores.put(9L, 0.9);
|
||||
}
|
||||
|
||||
private ColocateTableIndex createColocateIndex(GroupId groupId, List<Long> flatList) {
|
||||
ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
|
||||
int replicationNum = 3;
|
||||
List<List<Long>> backendsPerBucketSeq = Lists.partition(flatList, replicationNum);
|
||||
colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
|
||||
return colocateTableIndex;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBalance(@Mocked SystemInfoService infoService,
|
||||
@Mocked ClusterLoadStatistic statistic) {
|
||||
new Expectations() {
|
||||
{
|
||||
infoService.getBackend(1L);
|
||||
@ -103,20 +125,12 @@ public class ColocateTableBalancerTest {
|
||||
infoService.getBackend(9L);
|
||||
result = backend9;
|
||||
minTimes = 0;
|
||||
|
||||
statistic.getBackendLoadStatistic(anyLong);
|
||||
result = null;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ColocateTableIndex createColocateIndex(GroupId groupId, List<Long> flatList) {
|
||||
ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
|
||||
int replicationNum = 3;
|
||||
List<List<Long>> backendsPerBucketSeq = Lists.partition(flatList, replicationNum);
|
||||
colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
|
||||
return colocateTableIndex;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBalance() {
|
||||
GroupId groupId = new GroupId(10000, 10001);
|
||||
List<Column> distributionCols = Lists.newArrayList();
|
||||
distributionCols.add(new Column("k1", PrimitiveType.INT));
|
||||
@ -132,9 +146,8 @@ public class ColocateTableBalancerTest {
|
||||
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
|
||||
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
|
||||
colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
|
||||
System.out.println(balancedBackendsPerBucketSeq);
|
||||
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
|
||||
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
|
||||
List<List<Long>> expected = Lists.partition(
|
||||
Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 4L, 1L, 2L, 3L), 3);
|
||||
Assert.assertTrue(changed);
|
||||
@ -145,15 +158,50 @@ public class ColocateTableBalancerTest {
|
||||
Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L));
|
||||
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
|
||||
balancedBackendsPerBucketSeq.clear();
|
||||
changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
|
||||
colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
|
||||
changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
|
||||
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
|
||||
System.out.println(balancedBackendsPerBucketSeq);
|
||||
Assert.assertFalse(changed);
|
||||
Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFixBalanceEndlessLoop() {
|
||||
public void testFixBalanceEndlessLoop(@Mocked SystemInfoService infoService,
|
||||
@Mocked ClusterLoadStatistic statistic) {
|
||||
new Expectations() {
|
||||
{
|
||||
infoService.getBackend(1L);
|
||||
result = backend1;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(2L);
|
||||
result = backend2;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(3L);
|
||||
result = backend3;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(4L);
|
||||
result = backend4;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(5L);
|
||||
result = backend5;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(6L);
|
||||
result = backend6;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(7L);
|
||||
result = backend7;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(8L);
|
||||
result = backend8;
|
||||
minTimes = 0;
|
||||
infoService.getBackend(9L);
|
||||
result = backend9;
|
||||
minTimes = 0;
|
||||
statistic.getBackendLoadStatistic(anyLong);
|
||||
result = null;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
GroupId groupId = new GroupId(10000, 10001);
|
||||
List<Column> distributionCols = Lists.newArrayList();
|
||||
distributionCols.add(new Column("k1", PrimitiveType.INT));
|
||||
@ -168,8 +216,8 @@ public class ColocateTableBalancerTest {
|
||||
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
List<Long> allAvailBackendIds = Lists.newArrayList(7L);
|
||||
boolean changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
|
||||
colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
|
||||
boolean changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
|
||||
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
|
||||
Assert.assertFalse(changed);
|
||||
|
||||
// 2. all backends are checked but this round is not changed
|
||||
@ -180,8 +228,152 @@ public class ColocateTableBalancerTest {
|
||||
|
||||
balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L);
|
||||
changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
|
||||
colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
|
||||
changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
|
||||
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
|
||||
Assert.assertFalse(changed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) {
|
||||
new Expectations() {
|
||||
{
|
||||
statistic.getBackendLoadStatistic(anyLong);
|
||||
result = new Delegate<BackendLoadStatistic>() {
|
||||
BackendLoadStatistic delegate(Long beId) {
|
||||
return new FakeBackendLoadStatistic(beId, null, null, null);
|
||||
}
|
||||
};
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
// all buckets are on different be
|
||||
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
|
||||
Set<Long> unavailBackendIds = Sets.newHashSet(9L);
|
||||
List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
|
||||
List<Map.Entry<Long, Long>> backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs",
|
||||
allAvailBackendIds, unavailBackendIds, statistic, flatBackendsPerBucketSeq);
|
||||
long[] backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
|
||||
Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, backendIds);
|
||||
|
||||
// 0,1 bucket on same be and 5, 6 on same be
|
||||
flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 7L, 7L, 9L);
|
||||
backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds,
|
||||
statistic, flatBackendsPerBucketSeq);
|
||||
backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
|
||||
Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, backendIds);
|
||||
}
|
||||
|
||||
public final class FakeBackendLoadStatistic extends BackendLoadStatistic {
|
||||
public FakeBackendLoadStatistic(long beId, String clusterName, SystemInfoService infoService,
|
||||
TabletInvertedIndex invertedIndex) {
|
||||
super(beId, clusterName, infoService, invertedIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getMixLoadScore() {
|
||||
return mixLoadScores.get(getBeId());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBeSeqIndexes() {
|
||||
List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 2L, 3L, 4L, 2L);
|
||||
List<Integer> indexes = Deencapsulation.invoke(balancer, "getBeSeqIndexes", flatBackendsPerBucketSeq, 2L);
|
||||
Assert.assertArrayEquals(new int[]{1, 2, 5}, indexes.stream().mapToInt(i->i).toArray());
|
||||
System.out.println("backend1 id is " + backend1.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTableIndex,
|
||||
@Mocked SystemInfoService infoService,
|
||||
@Mocked Backend myBackend2,
|
||||
@Mocked Backend myBackend3,
|
||||
@Mocked Backend myBackend4,
|
||||
@Mocked Backend myBackend5
|
||||
) {
|
||||
GroupId groupId = new GroupId(10000, 10001);
|
||||
Set<Long> allBackendsInGroup = Sets.newHashSet(1L, 2L, 3L, 4L, 5L);
|
||||
new Expectations() {
|
||||
{
|
||||
infoService.getBackend(1L);
|
||||
result = null;
|
||||
minTimes = 0;
|
||||
|
||||
// backend2 is available
|
||||
infoService.getBackend(2L);
|
||||
result = myBackend2;
|
||||
minTimes = 0;
|
||||
myBackend2.isAvailable();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend3 not available, and dead for a long time
|
||||
infoService.getBackend(3L);
|
||||
result = myBackend3;
|
||||
minTimes = 0;
|
||||
myBackend3.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend3.isAlive();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend3.getLastUpdateMs();
|
||||
result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
|
||||
minTimes = 0;
|
||||
|
||||
// backend4 not available, and dead for a short time
|
||||
infoService.getBackend(4L);
|
||||
result = myBackend4;
|
||||
minTimes = 0;
|
||||
myBackend4.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend4.isAlive();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend4.getLastUpdateMs();
|
||||
result = System.currentTimeMillis();
|
||||
minTimes = 0;
|
||||
|
||||
// backend5 not available, and in decommission
|
||||
infoService.getBackend(5L);
|
||||
result = myBackend5;
|
||||
minTimes = 0;
|
||||
myBackend5.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend5.isAlive();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend5.isDecommissioned();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
colocateTableIndex.getBackendsByGroup(groupId);
|
||||
result = allBackendsInGroup;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
Set<Long> unavailableBeIds = Deencapsulation.invoke(balancer, "getUnavailableBeIdsInGroup", infoService, colocateTableIndex, groupId);
|
||||
System.out.println(unavailableBeIds);
|
||||
Assert.assertArrayEquals(new long[]{1L, 3L, 5L}, unavailableBeIds.stream().mapToLong(i->i).sorted().toArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) {
|
||||
List<Long> clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L);
|
||||
new Expectations(){
|
||||
{
|
||||
infoService.getClusterBackendIds("cluster1", true);
|
||||
result = clusterAliveBackendIds;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
Set<Long> unavailableBeIds = Sets.newHashSet(4L, 5L, 6L);
|
||||
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds);
|
||||
Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user