[BUG] Fix colocate balance bug when no available BE (#5079)
This commit is contained in:
@ -327,26 +327,21 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
// sort backends with replica num in desc order
|
||||
List<Map.Entry<Long, Long>> backendWithReplicaNum =
|
||||
getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, flatBackendsPerBucketSeq);
|
||||
if (seqIndexes == null || seqIndexes.size() <= 0) {
|
||||
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
|
||||
if (backendWithReplicaNum.size() <= 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
|
||||
if (backendWithReplicaNum.size() <= 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (seqIndexes == null || seqIndexes.size() <= 0) {
|
||||
// choose max bucketId num be as src be
|
||||
Preconditions.checkState(backendsPerBucketSeq.size() > 0);
|
||||
srcBeId = backendWithReplicaNum.get(0).getKey();
|
||||
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId);
|
||||
}
|
||||
|
||||
int i;
|
||||
if (hasUnavailableBe) {
|
||||
i = -1;
|
||||
} else {
|
||||
i = 0;
|
||||
}
|
||||
int j = backendWithReplicaNum.size() - 1;
|
||||
while (i < j) {
|
||||
boolean isThisRoundChanged = false;
|
||||
boolean isThisRoundChanged = false;
|
||||
for (int j = backendWithReplicaNum.size() - 1; j >= 0; j--) {
|
||||
// we try to use a low backend to replace the src backend.
|
||||
// if replace failed(eg: both backends are on some host), select next low backend and try(j--)
|
||||
Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
|
||||
@ -362,6 +357,11 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if we found src_id == dst_id we skip to next
|
||||
if (srcBeId == destBeId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int seqIndex : seqIndexes) {
|
||||
// the bucket index.
|
||||
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
|
||||
@ -381,23 +381,23 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
if (!isThisRoundChanged) {
|
||||
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
|
||||
srcBeId, destBeId, groupId);
|
||||
if (--j == i) {
|
||||
// if all backends are checked but this round is not changed,
|
||||
// we should end the outer loop to avoid endless loops
|
||||
LOG.info("all backends are checked but this round is not changed, " +
|
||||
"end outer loop in colocate group {}", groupId);
|
||||
break OUT;
|
||||
} else {
|
||||
// select another low backend and try again
|
||||
continue;
|
||||
}
|
||||
if (isThisRoundChanged) {
|
||||
// we found a change
|
||||
break;
|
||||
}
|
||||
// we use next node as dst node
|
||||
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
|
||||
srcBeId, destBeId, groupId);
|
||||
}
|
||||
|
||||
if (!isThisRoundChanged) {
|
||||
// if all backends are checked but this round is not changed,
|
||||
// we should end the loop
|
||||
LOG.info("all backends are checked but this round is not changed, " +
|
||||
"end outer loop in colocate group {}", groupId);
|
||||
break;
|
||||
} // end inner loop
|
||||
}
|
||||
// end inner loop
|
||||
}
|
||||
|
||||
if (isChanged) {
|
||||
|
||||
@ -233,6 +233,37 @@ public class ColocateTableBalancerTest {
|
||||
Assert.assertFalse(changed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFixBalanceEndlessLoop2(@Mocked SystemInfoService infoService,
|
||||
@Mocked ClusterLoadStatistic statistic) {
|
||||
new Expectations() {
|
||||
{
|
||||
statistic.getBackendLoadStatistic(anyLong);
|
||||
result = new Delegate<BackendLoadStatistic>() {
|
||||
BackendLoadStatistic delegate(Long beId) {
|
||||
return new FakeBackendLoadStatistic(beId, null, null, null);
|
||||
}
|
||||
};
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
GroupId groupId = new GroupId(10000, 10001);
|
||||
List<Column> distributionCols = Lists.newArrayList();
|
||||
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, distributionCols, 5, (short) 1);
|
||||
Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
|
||||
group2Schema.put(groupId, groupSchema);
|
||||
|
||||
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
|
||||
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
|
||||
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
Set<Long> unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
|
||||
List<Long> availBackendIds = Lists.newArrayList();
|
||||
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, unAvailBackendIds, availBackendIds,
|
||||
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
|
||||
Assert.assertFalse(changed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) {
|
||||
new Expectations() {
|
||||
|
||||
Reference in New Issue
Block a user