[BUG] Fix colocate balance bug when there is decommissioned be (#4955)
We should ignore decommissioned BE when select BEs to balance group bucketSeq.
This commit is contained in:
@ -151,10 +151,10 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
continue;
|
||||
}
|
||||
|
||||
Set<Long> unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
|
||||
List<Long> availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds);
|
||||
Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
|
||||
List<Long> availableBeIds = getAvailableBeIds(db.getClusterName(), infoService);
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
|
||||
if (relocateAndBalance(groupId, unavailableBeIdsInGroup, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
|
||||
colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
|
||||
catalog.getEditLog().logColocateBackendsPerBucketSeq(info);
|
||||
@ -484,27 +484,44 @@ public class ColocateTableBalancer extends MasterDaemon {
|
||||
private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) {
|
||||
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
|
||||
Set<Long> unavailableBeIds = Sets.newHashSet();
|
||||
long currTime = System.currentTimeMillis();
|
||||
for (Long backendId : backends) {
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
if (!checkBackendAvailable(backendId, infoService)) {
|
||||
unavailableBeIds.add(backendId);
|
||||
} else if (!be.isAvailable()) {
|
||||
// 1. BE is dead for a long time
|
||||
// 2. BE is under decommission
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|
||||
|| be.isDecommissioned()) {
|
||||
unavailableBeIds.add(backendId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return unavailableBeIds;
|
||||
}
|
||||
|
||||
private List<Long> getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set<Long> unavailableBeIds) {
|
||||
List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, true);
|
||||
return allBackendIds.stream()
|
||||
.filter(id -> !unavailableBeIds.contains(id))
|
||||
.collect(Collectors.toList());
|
||||
private List<Long> getAvailableBeIds(String cluster, SystemInfoService infoService) {
|
||||
// get all backends to allBackendIds, and check be availability using checkBackendAvailable
|
||||
// backend stopped for a short period of time is still considered available
|
||||
List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, false);
|
||||
List<Long> availableBeIds = Lists.newArrayList();
|
||||
for (Long backendId : allBackendIds) {
|
||||
if (checkBackendAvailable(backendId, infoService)) {
|
||||
availableBeIds.add(backendId);
|
||||
}
|
||||
}
|
||||
return availableBeIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* check backend available
|
||||
* backend stopped for a short period of time is still considered available
|
||||
*/
|
||||
private boolean checkBackendAvailable(Long backendId, SystemInfoService infoService) {
|
||||
long currTime = System.currentTimeMillis();
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
return false;
|
||||
} else if (!be.isAvailable()) {
|
||||
// 1. BE is dead for a long time
|
||||
// 2. BE is under decommission
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|
||||
|| be.isDecommissioned()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -362,18 +362,75 @@ public class ColocateTableBalancerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) {
|
||||
List<Long> clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L);
|
||||
public void testGetAvailableBeIds(@Mocked SystemInfoService infoService,
|
||||
@Mocked Backend myBackend2,
|
||||
@Mocked Backend myBackend3,
|
||||
@Mocked Backend myBackend4,
|
||||
@Mocked Backend myBackend5) {
|
||||
List<Long> clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L);
|
||||
new Expectations(){
|
||||
{
|
||||
infoService.getClusterBackendIds("cluster1", true);
|
||||
result = clusterAliveBackendIds;
|
||||
infoService.getClusterBackendIds("cluster1", false);
|
||||
result = clusterBackendIds;
|
||||
minTimes = 0;
|
||||
|
||||
infoService.getBackend(1L);
|
||||
result = null;
|
||||
minTimes = 0;
|
||||
|
||||
// backend2 is available
|
||||
infoService.getBackend(2L);
|
||||
result = myBackend2;
|
||||
minTimes = 0;
|
||||
myBackend2.isAvailable();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend3 not available, and dead for a long time
|
||||
infoService.getBackend(3L);
|
||||
result = myBackend3;
|
||||
minTimes = 0;
|
||||
myBackend3.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend3.isAlive();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend3.getLastUpdateMs();
|
||||
result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
|
||||
minTimes = 0;
|
||||
|
||||
// backend4 available, not alive but dead for a short time
|
||||
infoService.getBackend(4L);
|
||||
result = myBackend4;
|
||||
minTimes = 0;
|
||||
myBackend4.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend4.isAlive();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend4.getLastUpdateMs();
|
||||
result = System.currentTimeMillis();
|
||||
minTimes = 0;
|
||||
|
||||
// backend5 not available, and in decommission
|
||||
infoService.getBackend(5L);
|
||||
result = myBackend5;
|
||||
minTimes = 0;
|
||||
myBackend5.isAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend5.isAlive();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend5.isDecommissioned();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
Set<Long> unavailableBeIds = Sets.newHashSet(4L, 5L, 6L);
|
||||
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds);
|
||||
Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
|
||||
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIds","cluster1", infoService);
|
||||
Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user