cherry pick from #36444
This commit is contained in:
@ -429,7 +429,7 @@ public class ColocateTableIndex implements Writable {
|
||||
public Set<GroupId> getAllGroupIds() {
|
||||
readLock();
|
||||
try {
|
||||
return group2Tables.keySet();
|
||||
return Sets.newHashSet(group2Tables.keySet());
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
@ -378,7 +378,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* A B C D
|
||||
*/
|
||||
private void relocateAndBalanceGroups() {
|
||||
Set<GroupId> groupIds = Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds());
|
||||
Set<GroupId> groupIds = Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds();
|
||||
|
||||
// balance only inside each group, excluded balance between all groups
|
||||
Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
|
||||
@ -410,6 +410,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
}
|
||||
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
LOG.info("Not found colocate group {}, maybe delete", groupId);
|
||||
continue;
|
||||
}
|
||||
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
try {
|
||||
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
|
||||
@ -480,13 +484,18 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
// check each group
|
||||
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
|
||||
for (GroupId groupId : groupIds) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
LOG.info("Not found colocate group {}, maybe delete", groupId);
|
||||
continue;
|
||||
}
|
||||
|
||||
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
|
||||
List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
|
||||
if (backendBucketsSeq.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
String unstableReason = null;
|
||||
OUT:
|
||||
@ -588,6 +597,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
for (GroupId groupId : groupIds) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
LOG.info("Not found colocate group {}, maybe delete", groupId);
|
||||
continue;
|
||||
}
|
||||
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
@ -718,6 +728,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
GlobalColocateStatistic globalColocateStatistic, List<List<Long>> balancedBackendsPerBucketSeq,
|
||||
boolean balanceBetweenGroups) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
LOG.info("Not found colocate group {}, maybe delete", groupId);
|
||||
return false;
|
||||
}
|
||||
short replicaNum = groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
|
||||
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
|
||||
colocateIndex.getBackendsPerBucketSeqByTag(groupId, tag));
|
||||
|
||||
Reference in New Issue
Block a user