[improvement](tablet schedule) colocate balance between all groups (#23543)
This commit is contained in:
@ -49,6 +49,7 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -79,6 +80,248 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
public static class BucketStatistic {
|
||||
public int tabletOrderIdx;
|
||||
public int totalReplicaNum;
|
||||
public long totalReplicaDataSize;
|
||||
|
||||
public BucketStatistic(int tabletOrderIdx, int totalReplicaNum, long totalReplicaDataSize) {
|
||||
this.tabletOrderIdx = tabletOrderIdx;
|
||||
this.totalReplicaNum = totalReplicaNum;
|
||||
this.totalReplicaDataSize = totalReplicaDataSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof BucketStatistic)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BucketStatistic other = (BucketStatistic) obj;
|
||||
return tabletOrderIdx == other.tabletOrderIdx && totalReplicaNum == other.totalReplicaNum
|
||||
&& totalReplicaDataSize == other.totalReplicaDataSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{ orderIdx: " + tabletOrderIdx + ", total replica num: " + totalReplicaNum
|
||||
+ ", total data size: " + totalReplicaDataSize + " }";
|
||||
}
|
||||
}
|
||||
|
||||
public static class BackendBuckets {
|
||||
private long beId;
|
||||
private Map<GroupId, List<Integer>> groupTabletOrderIndices = Maps.newHashMap();
|
||||
|
||||
public BackendBuckets(long beId) {
|
||||
this.beId = beId;
|
||||
}
|
||||
|
||||
// for test
|
||||
public Map<GroupId, List<Integer>> getGroupTabletOrderIndices() {
|
||||
return groupTabletOrderIndices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof BackendBuckets)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BackendBuckets other = (BackendBuckets) obj;
|
||||
return beId == other.beId && groupTabletOrderIndices.equals(other.groupTabletOrderIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{ backendId: " + beId + ", group order index: " + groupTabletOrderIndices + " }";
|
||||
}
|
||||
|
||||
public void addGroupTablet(GroupId groupId, int tabletOrderIdx) {
|
||||
List<Integer> indices = groupTabletOrderIndices.get(groupId);
|
||||
if (indices == null) {
|
||||
indices = Lists.newArrayList();
|
||||
groupTabletOrderIndices.put(groupId, indices);
|
||||
}
|
||||
indices.add(tabletOrderIdx);
|
||||
}
|
||||
|
||||
public void removeGroupTablet(GroupId groupId, int tabletOrderIdx) {
|
||||
List<Integer> indices = groupTabletOrderIndices.get(groupId);
|
||||
if (indices == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
indices.remove(Integer.valueOf(tabletOrderIdx));
|
||||
if (indices.isEmpty()) {
|
||||
groupTabletOrderIndices.remove(groupId);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containsGroupTablet(GroupId groupId, int tabletOrderIdx) {
|
||||
List<Integer> indices = groupTabletOrderIndices.get(groupId);
|
||||
if (indices == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return indices.indexOf(Integer.valueOf(tabletOrderIdx)) >= 0;
|
||||
}
|
||||
|
||||
public int getTotalReplicaNum(Map<GroupId, List<BucketStatistic>> allGroupBucketsMap) {
|
||||
int totalReplicaNum = 0;
|
||||
for (Map.Entry<GroupId, List<Integer>> entry : groupTabletOrderIndices.entrySet()) {
|
||||
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(entry.getKey());
|
||||
if (bucketStatistics != null) {
|
||||
for (int tabletOrderIdx : entry.getValue()) {
|
||||
if (tabletOrderIdx < bucketStatistics.size()) {
|
||||
totalReplicaNum += bucketStatistics.get(tabletOrderIdx).totalReplicaNum;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return totalReplicaNum;
|
||||
}
|
||||
|
||||
public long getTotalReplicaDataSize(Map<GroupId, List<BucketStatistic>> allGroupBucketsMap) {
|
||||
long totalReplicaDataSize = 0;
|
||||
for (Map.Entry<GroupId, List<Integer>> entry : groupTabletOrderIndices.entrySet()) {
|
||||
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(entry.getKey());
|
||||
if (bucketStatistics != null) {
|
||||
for (int tabletOrderIdx : entry.getValue()) {
|
||||
if (tabletOrderIdx < bucketStatistics.size()) {
|
||||
totalReplicaDataSize += bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return totalReplicaDataSize;
|
||||
}
|
||||
|
||||
public int getTotalBucketsNum() {
|
||||
return groupTabletOrderIndices.values().stream().mapToInt(indices -> indices.size()).sum();
|
||||
}
|
||||
|
||||
public int getGroupBucketsNum(GroupId groupId) {
|
||||
List<Integer> indices = groupTabletOrderIndices.get(groupId);
|
||||
if (indices == null) {
|
||||
return 0;
|
||||
} else {
|
||||
return indices.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class GlobalColocateStatistic {
|
||||
private Map<Long, BackendBuckets> backendBucketsMap = Maps.newHashMap();
|
||||
private Map<GroupId, List<BucketStatistic>> allGroupBucketsMap = Maps.newHashMap();
|
||||
private Map<Tag, Integer> allTagBucketNum = Maps.newHashMap();
|
||||
private static final BackendBuckets DUMMY_BE = new BackendBuckets(0);
|
||||
|
||||
public GlobalColocateStatistic() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof GlobalColocateStatistic)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
GlobalColocateStatistic other = (GlobalColocateStatistic) obj;
|
||||
return backendBucketsMap.equals(other.backendBucketsMap)
|
||||
&& allGroupBucketsMap.equals(other.allGroupBucketsMap)
|
||||
&& allTagBucketNum.equals(other.allTagBucketNum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{ backends: " + backendBucketsMap + ", groups: " + allGroupBucketsMap
|
||||
+ ", tag bucket num: " + allTagBucketNum + " }";
|
||||
}
|
||||
|
||||
Map<Long, BackendBuckets> getBackendBucketsMap() {
|
||||
return backendBucketsMap;
|
||||
}
|
||||
|
||||
Map<GroupId, List<BucketStatistic>> getAllGroupBucketsMap() {
|
||||
return allGroupBucketsMap;
|
||||
}
|
||||
|
||||
Map<Tag, Integer> getAllTagBucketNum() {
|
||||
return allTagBucketNum;
|
||||
}
|
||||
|
||||
public boolean moveTablet(GroupId groupId, int tabletOrderIdx,
|
||||
long srcBeId, long destBeId) {
|
||||
BackendBuckets srcBackendBuckets = backendBucketsMap.get(srcBeId);
|
||||
if (srcBackendBuckets == null || !srcBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BackendBuckets destBackendBuckets = backendBucketsMap.get(destBeId);
|
||||
if (destBackendBuckets == null) {
|
||||
destBackendBuckets = new BackendBuckets(destBeId);
|
||||
backendBucketsMap.put(destBeId, destBackendBuckets);
|
||||
}
|
||||
if (destBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
srcBackendBuckets.removeGroupTablet(groupId, tabletOrderIdx);
|
||||
destBackendBuckets.addGroupTablet(groupId, tabletOrderIdx);
|
||||
if (srcBackendBuckets.getTotalBucketsNum() == 0) {
|
||||
backendBucketsMap.remove(srcBeId);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public int getBackendTotalBucketNum(long backendId) {
|
||||
return backendBucketsMap.getOrDefault(backendId, DUMMY_BE).getTotalBucketsNum();
|
||||
}
|
||||
|
||||
public long getBackendTotalReplicaDataSize(long backendId) {
|
||||
return backendBucketsMap.getOrDefault(backendId, DUMMY_BE)
|
||||
.getTotalReplicaDataSize(allGroupBucketsMap);
|
||||
}
|
||||
|
||||
public long getBucketTotalReplicaDataSize(GroupId groupId, int tabletOrderIdx) {
|
||||
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(groupId);
|
||||
if (bucketStatistics != null && tabletOrderIdx < bucketStatistics.size()) {
|
||||
return bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
|
||||
} else {
|
||||
return 0L;
|
||||
}
|
||||
}
|
||||
|
||||
public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc, List<Set<Long>> backendBucketsSeq,
|
||||
List<Long> totalReplicaDataSizes, int totalReplicaNumPerBucket) {
|
||||
Preconditions.checkState(backendBucketsSeq.size() == totalReplicaDataSizes.size(),
|
||||
backendBucketsSeq.size() + " vs. " + totalReplicaDataSizes.size());
|
||||
List<BucketStatistic> bucketStatistics = Lists.newArrayList();
|
||||
for (int tabletOrderIdx = 0; tabletOrderIdx < backendBucketsSeq.size(); tabletOrderIdx++) {
|
||||
BucketStatistic bucket = new BucketStatistic(tabletOrderIdx, totalReplicaNumPerBucket,
|
||||
totalReplicaDataSizes.get(tabletOrderIdx));
|
||||
bucketStatistics.add(bucket);
|
||||
for (long backendId : backendBucketsSeq.get(tabletOrderIdx)) {
|
||||
BackendBuckets backendBuckets = backendBucketsMap.get(backendId);
|
||||
if (backendBuckets == null) {
|
||||
backendBuckets = new BackendBuckets(backendId);
|
||||
backendBucketsMap.put(backendId, backendBuckets);
|
||||
}
|
||||
backendBuckets.addGroupTablet(groupId, tabletOrderIdx);
|
||||
}
|
||||
}
|
||||
int bucketNum = backendBucketsSeq.size();
|
||||
replicaAlloc.getAllocMap().forEach((tag, count) -> {
|
||||
allTagBucketNum.put(tag, allTagBucketNum.getOrDefault(tag, 0) + bucketNum * count);
|
||||
});
|
||||
allGroupBucketsMap.put(groupId, bucketStatistics);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
/*
|
||||
* Each round, we do 2 steps:
|
||||
@ -92,8 +335,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* Otherwise, mark the group as stable
|
||||
*/
|
||||
protected void runAfterCatalogReady() {
|
||||
relocateAndBalanceGroup();
|
||||
matchGroup();
|
||||
relocateAndBalanceGroups();
|
||||
matchGroups();
|
||||
}
|
||||
|
||||
/*
|
||||
@ -134,17 +377,32 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* +-+ +-+ +-+ +-+
|
||||
* A B C D
|
||||
*/
|
||||
private void relocateAndBalanceGroup() {
|
||||
private void relocateAndBalanceGroups() {
|
||||
Set<GroupId> groupIds = Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds());
|
||||
|
||||
// balance only inside each group, excluded balance between all groups
|
||||
Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
|
||||
|
||||
if (!Config.disable_colocate_balance_between_groups
|
||||
&& !changeGroups.isEmpty()) {
|
||||
// balance both inside each group and between all groups
|
||||
relocateAndBalanceGroup(changeGroups, true);
|
||||
}
|
||||
}
|
||||
|
||||
private Set<GroupId> relocateAndBalanceGroup(Set<GroupId> groupIds, boolean balanceBetweenGroups) {
|
||||
Set<GroupId> changeGroups = Sets.newHashSet();
|
||||
if (Config.disable_colocate_balance) {
|
||||
return;
|
||||
return changeGroups;
|
||||
}
|
||||
|
||||
Env env = Env.getCurrentEnv();
|
||||
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
|
||||
SystemInfoService infoService = Env.getCurrentSystemInfo();
|
||||
|
||||
GlobalColocateStatistic globalColocateStatistic = buildGlobalColocateStatistic();
|
||||
|
||||
// get all groups
|
||||
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
|
||||
for (GroupId groupId : groupIds) {
|
||||
Map<Tag, LoadStatisticForTag> statisticMap = env.getTabletScheduler().getStatisticMap();
|
||||
if (statisticMap == null) {
|
||||
@ -182,13 +440,15 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
// try relocate or balance this group for specified tag
|
||||
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
|
||||
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex,
|
||||
infoService, statistic, balancedBackendsPerBucketSeq)) {
|
||||
infoService, statistic, globalColocateStatistic, balancedBackendsPerBucketSeq,
|
||||
balanceBetweenGroups)) {
|
||||
if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq,
|
||||
replicaAlloc)) {
|
||||
replicaAlloc)) {
|
||||
LOG.warn("relocate group {} succ, but replica allocation has change, old replica alloc {}",
|
||||
groupId, replicaAlloc);
|
||||
continue;
|
||||
}
|
||||
changeGroups.add(groupId);
|
||||
Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap = Maps.newHashMap();
|
||||
balancedBackendsPerBucketSeqMap.put(tag, balancedBackendsPerBucketSeq);
|
||||
ColocatePersistInfo info = ColocatePersistInfo
|
||||
@ -199,6 +459,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return changeGroups;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -206,7 +468,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* replicas, and mark that group as unstable.
|
||||
* If every replicas match the backends in group, mark that group as stable.
|
||||
*/
|
||||
private void matchGroup() {
|
||||
private void matchGroups() {
|
||||
long start = System.currentTimeMillis();
|
||||
CheckerCounter counter = new CheckerCounter();
|
||||
|
||||
@ -315,6 +577,83 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
counter.tabletInScheduler, counter.tabletNotReady, cost);
|
||||
}
|
||||
|
||||
private GlobalColocateStatistic buildGlobalColocateStatistic() {
|
||||
Env env = Env.getCurrentEnv();
|
||||
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
|
||||
GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic();
|
||||
|
||||
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
|
||||
for (GroupId groupId : groupIds) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
continue;
|
||||
}
|
||||
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
|
||||
List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
|
||||
if (backendBucketsSeq.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int totalReplicaNumPerBucket = 0;
|
||||
ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
|
||||
for (int i = 0; i < backendBucketsSeq.size(); i++) {
|
||||
totalReplicaDataSizes.add(0L);
|
||||
}
|
||||
|
||||
for (Long tableId : tableIds) {
|
||||
long dbId = groupId.dbId;
|
||||
if (dbId == 0) {
|
||||
dbId = groupId.getDbIdByTblId(tableId);
|
||||
}
|
||||
Database db = env.getInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
continue;
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
|
||||
if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
olapTable.readLock();
|
||||
try {
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
short replicationNum = replicaAlloc.getTotalReplicaNum();
|
||||
|
||||
// Here we only get VISIBLE indexes. All other indexes are not queryable.
|
||||
// So it does not matter if tablets of other indexes are not matched.
|
||||
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
Preconditions.checkState(backendBucketsSeq.size() == index.getTablets().size(),
|
||||
backendBucketsSeq.size() + " vs. " + index.getTablets().size());
|
||||
int tabletOrderIdx = 0;
|
||||
totalReplicaNumPerBucket++;
|
||||
for (Long tabletId : index.getTabletIdsInOrder()) {
|
||||
Set<Long> bucketsSeq = backendBucketsSeq.get(tabletOrderIdx);
|
||||
Preconditions.checkState(bucketsSeq.size() == replicationNum,
|
||||
bucketsSeq.size() + " vs. " + replicationNum);
|
||||
Tablet tablet = index.getTablet(tabletId);
|
||||
totalReplicaDataSizes.set(tabletOrderIdx,
|
||||
totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true));
|
||||
tabletOrderIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("build group {} colocate statistic error", groupId, e);
|
||||
continue;
|
||||
} finally {
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
globalColocateStatistic.addGroup(groupId, replicaAlloc, backendBucketsSeq, totalReplicaDataSizes,
|
||||
totalReplicaNumPerBucket);
|
||||
}
|
||||
|
||||
return globalColocateStatistic;
|
||||
}
|
||||
|
||||
/*
|
||||
* Each balance is performed for a single workload group in a colocate group.
|
||||
* For example, if the replica allocation of a colocate group is {TagA: 2, TagB: 1},
|
||||
@ -373,8 +712,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* Return false if nothing changed.
|
||||
*/
|
||||
private boolean relocateAndBalance(GroupId groupId, Tag tag, Set<Long> unavailableBeIds, List<Long> availableBeIds,
|
||||
ColocateTableIndex colocateIndex, SystemInfoService infoService,
|
||||
LoadStatisticForTag statistic, List<List<Long>> balancedBackendsPerBucketSeq) {
|
||||
ColocateTableIndex colocateIndex, SystemInfoService infoService, LoadStatisticForTag statistic,
|
||||
GlobalColocateStatistic globalColocateStatistic, List<List<Long>> balancedBackendsPerBucketSeq,
|
||||
boolean balanceBetweenGroups) {
|
||||
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
|
||||
short replicaNum = groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
|
||||
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
|
||||
@ -383,7 +723,16 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
List<Long> flatBackendsPerBucketSeq = backendsPerBucketSeq.stream()
|
||||
.flatMap(List::stream).collect(Collectors.toList());
|
||||
|
||||
int tagTotalBucketNum = globalColocateStatistic.getAllTagBucketNum().getOrDefault(tag, 0);
|
||||
int availableBeNum = availableBeIds.size();
|
||||
int highTotalBucketNumPerBe = availableBeNum == 0 ? 0 :
|
||||
(tagTotalBucketNum + availableBeNum - 1) / availableBeNum;
|
||||
int lowTotalBucketNumPerBe = availableBeNum == 0 ? 0 : tagTotalBucketNum / availableBeNum;
|
||||
|
||||
boolean isChanged = false;
|
||||
int times = 0;
|
||||
List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
|
||||
|
||||
OUT:
|
||||
while (true) {
|
||||
// update backends and hosts at each round
|
||||
@ -394,27 +743,34 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
return false;
|
||||
}
|
||||
Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size());
|
||||
times++;
|
||||
if (times > 10 * backendsPerBucketSeq.size()) {
|
||||
LOG.warn("iterate too many times for relocate group: {}, times: {}, bucket num: {}",
|
||||
groupId, times, backendsPerBucketSeq.size());
|
||||
break;
|
||||
}
|
||||
|
||||
long srcBeId = -1;
|
||||
List<Integer> seqIndexes = null;
|
||||
boolean hasUnavailableBe = false;
|
||||
boolean srcBeUnavailable = false;
|
||||
// first choose the unavailable be as src be
|
||||
for (Long beId : unavailableBeIds) {
|
||||
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
|
||||
if (!seqIndexes.isEmpty()) {
|
||||
srcBeId = beId;
|
||||
hasUnavailableBe = true;
|
||||
srcBeUnavailable = true;
|
||||
LOG.info("find unavailable backend {} in colocate group: {}", beId, groupId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// sort backends with replica num in desc order
|
||||
List<Map.Entry<Long, Long>> backendWithReplicaNum =
|
||||
getSortedBackendReplicaNumPairs(availableBeIds,
|
||||
unavailableBeIds, statistic, flatBackendsPerBucketSeq);
|
||||
getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic,
|
||||
globalColocateStatistic, flatBackendsPerBucketSeq);
|
||||
|
||||
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
|
||||
if (backendWithReplicaNum.size() <= 1 && !hasUnavailableBe) {
|
||||
if (backendWithReplicaNum.size() <= 1 && !srcBeUnavailable) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -430,12 +786,40 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
// we try to use a low backend to replace the src backend.
|
||||
// if replace failed(eg: both backends are on some host), select next low backend and try(j--)
|
||||
Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
|
||||
if ((!hasUnavailableBe) && (seqIndexes.size() - lowBackend.getValue()) <= 1) {
|
||||
// balanced
|
||||
break OUT;
|
||||
long destBeId = lowBackend.getKey();
|
||||
if (!srcBeUnavailable) {
|
||||
long diffThisGroup = seqIndexes.size() - lowBackend.getValue();
|
||||
if (diffThisGroup < 1) {
|
||||
// balanced
|
||||
break OUT;
|
||||
}
|
||||
|
||||
// src's group bucket num = dest's group bucket num + 1
|
||||
// if move group bucket from src to dest, dest will be one more group num than src.
|
||||
// check global view
|
||||
//
|
||||
// suppose bucket num = 3, three BE A/B/C, two group group1/group2, then we have:
|
||||
//
|
||||
// A [ group1:bucket0, group2:bucket0]
|
||||
// B [ group1:bucket1, group2:bucket1]
|
||||
// C [ group1:bucket2, group2:bucket2]
|
||||
//
|
||||
// if we add a new BE D, for each group: bucketNum(A)=bucketNum(B)=bucketNum(C)=1, bucketNum(D)=0
|
||||
// so each group is balance, but in global groups view, it's not balance.
|
||||
// we should move one of the buckets to D
|
||||
if (diffThisGroup == 1) {
|
||||
if (!balanceBetweenGroups) {
|
||||
break OUT;
|
||||
}
|
||||
int srcTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
|
||||
int destTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId);
|
||||
if (srcTotalBucketNum <= highTotalBucketNumPerBe
|
||||
|| destTotalBucketNum >= lowTotalBucketNumPerBe) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long destBeId = lowBackend.getKey();
|
||||
Backend destBe = infoService.getBackend(destBeId);
|
||||
if (destBe == null) {
|
||||
LOG.info("backend {} does not exist", destBeId);
|
||||
@ -458,6 +842,14 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
continue;
|
||||
}
|
||||
|
||||
BackendLoadStatistic beStat = statistic.getBackendLoadStatistic(destBeId);
|
||||
if (beStat == null) {
|
||||
LOG.warn("not found backend {} statistic", destBeId);
|
||||
continue;
|
||||
}
|
||||
|
||||
int targetSeqIndex = -1;
|
||||
long minDataSizeDiff = Long.MAX_VALUE;
|
||||
for (int seqIndex : seqIndexes) {
|
||||
// the bucket index.
|
||||
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
|
||||
@ -465,26 +857,62 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
List<Long> backendsSet = backendsPerBucketSeq.get(bucketIndex);
|
||||
List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
|
||||
// the replicas of a tablet can not locate in same Backend or same host
|
||||
if (!backendsSet.contains(destBeId) && !hostsSet.contains(destBe.getHost())) {
|
||||
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
|
||||
flatBackendsPerBucketSeq.set(seqIndex, destBeId);
|
||||
LOG.info("replace backend {} with backend {} in colocate group {}, idx: {}",
|
||||
srcBeId, destBeId, groupId, seqIndex);
|
||||
// just replace one backend at a time, src and dest BE id should be recalculated because
|
||||
// flatBackendsPerBucketSeq is changed.
|
||||
isChanged = true;
|
||||
isThisRoundChanged = true;
|
||||
break;
|
||||
if (backendsSet.contains(destBeId) || hostsSet.contains(destBe.getHost())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
|
||||
long bucketDataSize =
|
||||
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
|
||||
|
||||
resultPaths.clear();
|
||||
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, true);
|
||||
if (!st.ok()) {
|
||||
LOG.debug("backend {} is unable to fit in group {}, tablet order idx {}, data size {}",
|
||||
destBeId, groupId, bucketIndex, bucketDataSize);
|
||||
continue;
|
||||
}
|
||||
|
||||
long newSrcBeTotalReplicaDataSize = globalColocateStatistic.getBackendTotalReplicaDataSize(srcBeId)
|
||||
- bucketDataSize;
|
||||
long newDestBeTotalReplicaDataSize =
|
||||
globalColocateStatistic.getBackendTotalReplicaDataSize(destBeId) + bucketDataSize;
|
||||
long dataSizeDiff = Math.abs(newSrcBeTotalReplicaDataSize - newDestBeTotalReplicaDataSize);
|
||||
if (targetSeqIndex < 0 || dataSizeDiff < minDataSizeDiff) {
|
||||
targetSeqIndex = seqIndex;
|
||||
minDataSizeDiff = dataSizeDiff;
|
||||
}
|
||||
}
|
||||
|
||||
if (isThisRoundChanged) {
|
||||
// we found a change
|
||||
break;
|
||||
if (targetSeqIndex < 0) {
|
||||
// we use next node as dst node
|
||||
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
|
||||
srcBeId, destBeId, groupId);
|
||||
continue;
|
||||
}
|
||||
// we use next node as dst node
|
||||
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
|
||||
srcBeId, destBeId, groupId);
|
||||
|
||||
int tabletOrderIdx = targetSeqIndex / replicaNum;
|
||||
int oldSrcThisGroup = seqIndexes.size();
|
||||
long oldDestThisGroup = lowBackend.getValue();
|
||||
int oldSrcBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
|
||||
int oldDestBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId);
|
||||
LOG.debug("OneMove: group {}, src {}, this group {}, all group {}, dest {}, this group {}, "
|
||||
+ "all group {}", groupId, srcBeId, oldSrcThisGroup, oldSrcBucketNum, destBeId,
|
||||
oldDestThisGroup, oldDestBucketNum);
|
||||
Preconditions.checkState(
|
||||
globalColocateStatistic.moveTablet(groupId, tabletOrderIdx, srcBeId, destBeId));
|
||||
Preconditions.checkState(oldSrcBucketNum - 1
|
||||
== globalColocateStatistic.getBackendTotalBucketNum(srcBeId));
|
||||
Preconditions.checkState(oldDestBucketNum + 1
|
||||
== globalColocateStatistic.getBackendTotalBucketNum(destBeId));
|
||||
flatBackendsPerBucketSeq.set(targetSeqIndex, destBeId);
|
||||
// just replace one backend at a time, src and dest BE id should be recalculated because
|
||||
// flatBackendsPerBucketSeq is changed.
|
||||
isChanged = true;
|
||||
isThisRoundChanged = true;
|
||||
LOG.info("replace backend {} with backend {} in colocate group {}, idx: {}",
|
||||
srcBeId, destBeId, groupId, targetSeqIndex);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!isThisRoundChanged) {
|
||||
@ -642,7 +1070,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
}
|
||||
|
||||
private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
|
||||
Set<Long> unavailBackendIds, LoadStatisticForTag statistic, List<Long> flatBackendsPerBucketSeq) {
|
||||
Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
|
||||
GlobalColocateStatistic globalColocateStatistic,
|
||||
List<Long> flatBackendsPerBucketSeq) {
|
||||
// backend id -> replica num, and sorted by replica num, descending.
|
||||
Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
|
||||
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||
@ -664,20 +1094,27 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
if (!entry1.getValue().equals(entry2.getValue())) {
|
||||
return (int) (entry2.getValue() - entry1.getValue());
|
||||
}
|
||||
|
||||
// From java 7, sorting needs to satisfy reflexivity, transitivity and symmetry.
|
||||
// Otherwise it will raise exception "Comparison method violates its general contract".
|
||||
|
||||
BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey());
|
||||
BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey());
|
||||
if (beStat1 == null || beStat2 == null) {
|
||||
return 0;
|
||||
if (beStat1 == null && beStat2 == null) {
|
||||
return 0;
|
||||
} else {
|
||||
return beStat1 == null ? 1 : -1;
|
||||
}
|
||||
}
|
||||
double loadScore1 = beStat1.getMixLoadScore();
|
||||
double loadScore2 = beStat2.getMixLoadScore();
|
||||
if (Math.abs(loadScore1 - loadScore2) < 1e-6) {
|
||||
return 0;
|
||||
} else if (loadScore2 > loadScore1) {
|
||||
return 1;
|
||||
} else {
|
||||
return -1;
|
||||
int cmp = Double.compare(loadScore2, loadScore1);
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
|
||||
return Long.compare(entry1.getKey(), entry2.getKey());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@ -1138,7 +1138,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// it will also delete replica from tablet inverted index.
|
||||
tabletCtx.deleteReplica(replica);
|
||||
|
||||
if (force) {
|
||||
if (force || FeConstants.runningUnitTest) {
|
||||
// send the delete replica task.
|
||||
// also, this may not be necessary, but delete it will make things simpler.
|
||||
// NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report
|
||||
|
||||
Reference in New Issue
Block a user