diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1560334c11..8e1c8761de 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -712,6 +712,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; + @ConfField(mutable = true, masterOnly = true, description = {"是否启用group间的均衡", + "is allow colocate balance between all groups"}) + public static boolean disable_colocate_balance_between_groups = false; + /** * The default user resource publishing timeout. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index fce07ed2a5..358185dd60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -49,6 +49,7 @@ import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -79,6 +80,248 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { return INSTANCE; } + public static class BucketStatistic { + public int tabletOrderIdx; + public int totalReplicaNum; + public long totalReplicaDataSize; + + public BucketStatistic(int tabletOrderIdx, int totalReplicaNum, long totalReplicaDataSize) { + this.tabletOrderIdx = tabletOrderIdx; + this.totalReplicaNum = totalReplicaNum; + this.totalReplicaDataSize = totalReplicaDataSize; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BucketStatistic)) { + return false; + } + + BucketStatistic other = (BucketStatistic) obj; + return tabletOrderIdx == other.tabletOrderIdx && totalReplicaNum == other.totalReplicaNum + && totalReplicaDataSize == other.totalReplicaDataSize; + } + + @Override + public String toString() { + return "{ orderIdx: " + tabletOrderIdx + ", total replica num: " + totalReplicaNum + + ", total data size: " + totalReplicaDataSize + " }"; + } + } + + public static class BackendBuckets { + private long beId; + private Map> groupTabletOrderIndices = Maps.newHashMap(); + + public BackendBuckets(long beId) { + this.beId = beId; + } + + // for test + public Map> getGroupTabletOrderIndices() { + return groupTabletOrderIndices; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BackendBuckets)) { + return false; + } + + BackendBuckets other = (BackendBuckets) obj; + return beId == other.beId && groupTabletOrderIndices.equals(other.groupTabletOrderIndices); + } + + @Override + public String toString() { + return "{ backendId: " + beId + ", group order index: " + groupTabletOrderIndices + " }"; + } + + public void addGroupTablet(GroupId groupId, int tabletOrderIdx) { + List indices = groupTabletOrderIndices.get(groupId); + if (indices == null) { + indices = Lists.newArrayList(); + groupTabletOrderIndices.put(groupId, indices); + } + indices.add(tabletOrderIdx); + } + + public void removeGroupTablet(GroupId groupId, int tabletOrderIdx) { + List indices = groupTabletOrderIndices.get(groupId); + if (indices == null) { + return; + } + + indices.remove(Integer.valueOf(tabletOrderIdx)); + if (indices.isEmpty()) { + groupTabletOrderIndices.remove(groupId); + } + } + + public boolean containsGroupTablet(GroupId groupId, int tabletOrderIdx) { + List indices = groupTabletOrderIndices.get(groupId); + if (indices == null) { + return false; + } + + return indices.indexOf(Integer.valueOf(tabletOrderIdx)) >= 0; + } + + public int getTotalReplicaNum(Map> allGroupBucketsMap) { + int totalReplicaNum = 0; + for (Map.Entry> entry : groupTabletOrderIndices.entrySet()) { + List bucketStatistics = allGroupBucketsMap.get(entry.getKey()); + if (bucketStatistics != null) { + for (int tabletOrderIdx : entry.getValue()) { + if (tabletOrderIdx < bucketStatistics.size()) { + totalReplicaNum += bucketStatistics.get(tabletOrderIdx).totalReplicaNum; + } + } + } + } + + return totalReplicaNum; + } + + public long getTotalReplicaDataSize(Map> allGroupBucketsMap) { + long totalReplicaDataSize = 0; + for (Map.Entry> entry : groupTabletOrderIndices.entrySet()) { + List bucketStatistics = allGroupBucketsMap.get(entry.getKey()); + if (bucketStatistics != null) { + for (int tabletOrderIdx : entry.getValue()) { + if (tabletOrderIdx < bucketStatistics.size()) { + totalReplicaDataSize += bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize; + } + } + } + } + + return totalReplicaDataSize; + } + + public int getTotalBucketsNum() { + return groupTabletOrderIndices.values().stream().mapToInt(indices -> indices.size()).sum(); + } + + public int getGroupBucketsNum(GroupId groupId) { + List indices = groupTabletOrderIndices.get(groupId); + if (indices == null) { + return 0; + } else { + return indices.size(); + } + } + } + + public static class GlobalColocateStatistic { + private Map backendBucketsMap = Maps.newHashMap(); + private Map> allGroupBucketsMap = Maps.newHashMap(); + private Map allTagBucketNum = Maps.newHashMap(); + private static final BackendBuckets DUMMY_BE = new BackendBuckets(0); + + public GlobalColocateStatistic() { + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GlobalColocateStatistic)) { + return false; + } + + GlobalColocateStatistic other = (GlobalColocateStatistic) obj; + return backendBucketsMap.equals(other.backendBucketsMap) + && allGroupBucketsMap.equals(other.allGroupBucketsMap) + && allTagBucketNum.equals(other.allTagBucketNum); + } + + @Override + public String toString() { + return "{ backends: " + backendBucketsMap + ", groups: " + allGroupBucketsMap + + ", tag bucket num: " + allTagBucketNum + " }"; + } + + Map getBackendBucketsMap() { + return backendBucketsMap; + } + + Map> getAllGroupBucketsMap() { + return allGroupBucketsMap; + } + + Map getAllTagBucketNum() { + return allTagBucketNum; + } + + public boolean moveTablet(GroupId groupId, int tabletOrderIdx, + long srcBeId, long destBeId) { + BackendBuckets srcBackendBuckets = backendBucketsMap.get(srcBeId); + if (srcBackendBuckets == null || !srcBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) { + return false; + } + + BackendBuckets destBackendBuckets = backendBucketsMap.get(destBeId); + if (destBackendBuckets == null) { + destBackendBuckets = new BackendBuckets(destBeId); + backendBucketsMap.put(destBeId, destBackendBuckets); + } + if (destBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) { + return false; + } + + srcBackendBuckets.removeGroupTablet(groupId, tabletOrderIdx); + destBackendBuckets.addGroupTablet(groupId, tabletOrderIdx); + if (srcBackendBuckets.getTotalBucketsNum() == 0) { + backendBucketsMap.remove(srcBeId); + } + + return true; + } + + public int getBackendTotalBucketNum(long backendId) { + return backendBucketsMap.getOrDefault(backendId, DUMMY_BE).getTotalBucketsNum(); + } + + public long getBackendTotalReplicaDataSize(long backendId) { + return backendBucketsMap.getOrDefault(backendId, DUMMY_BE) + .getTotalReplicaDataSize(allGroupBucketsMap); + } + + public long getBucketTotalReplicaDataSize(GroupId groupId, int tabletOrderIdx) { + List bucketStatistics = allGroupBucketsMap.get(groupId); + if (bucketStatistics != null && tabletOrderIdx < bucketStatistics.size()) { + return bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize; + } else { + return 0L; + } + } + + public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc, List> backendBucketsSeq, + List totalReplicaDataSizes, int totalReplicaNumPerBucket) { + Preconditions.checkState(backendBucketsSeq.size() == totalReplicaDataSizes.size(), + backendBucketsSeq.size() + " vs. " + totalReplicaDataSizes.size()); + List bucketStatistics = Lists.newArrayList(); + for (int tabletOrderIdx = 0; tabletOrderIdx < backendBucketsSeq.size(); tabletOrderIdx++) { + BucketStatistic bucket = new BucketStatistic(tabletOrderIdx, totalReplicaNumPerBucket, + totalReplicaDataSizes.get(tabletOrderIdx)); + bucketStatistics.add(bucket); + for (long backendId : backendBucketsSeq.get(tabletOrderIdx)) { + BackendBuckets backendBuckets = backendBucketsMap.get(backendId); + if (backendBuckets == null) { + backendBuckets = new BackendBuckets(backendId); + backendBucketsMap.put(backendId, backendBuckets); + } + backendBuckets.addGroupTablet(groupId, tabletOrderIdx); + } + } + int bucketNum = backendBucketsSeq.size(); + replicaAlloc.getAllocMap().forEach((tag, count) -> { + allTagBucketNum.put(tag, allTagBucketNum.getOrDefault(tag, 0) + bucketNum * count); + }); + allGroupBucketsMap.put(groupId, bucketStatistics); + } + + } + @Override /* * Each round, we do 2 steps: @@ -92,8 +335,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { * Otherwise, mark the group as stable */ protected void runAfterCatalogReady() { - relocateAndBalanceGroup(); - matchGroup(); + relocateAndBalanceGroups(); + matchGroups(); } /* @@ -134,17 +377,32 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { * +-+ +-+ +-+ +-+ * A B C D */ - private void relocateAndBalanceGroup() { + private void relocateAndBalanceGroups() { + Set groupIds = Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds()); + + // balance only inside each group, excluded balance between all groups + Set changeGroups = relocateAndBalanceGroup(groupIds, false); + + if (!Config.disable_colocate_balance_between_groups + && !changeGroups.isEmpty()) { + // balance both inside each group and between all groups + relocateAndBalanceGroup(changeGroups, true); + } + } + + private Set relocateAndBalanceGroup(Set groupIds, boolean balanceBetweenGroups) { + Set changeGroups = Sets.newHashSet(); if (Config.disable_colocate_balance) { - return; + return changeGroups; } Env env = Env.getCurrentEnv(); ColocateTableIndex colocateIndex = env.getColocateTableIndex(); SystemInfoService infoService = Env.getCurrentSystemInfo(); + GlobalColocateStatistic globalColocateStatistic = buildGlobalColocateStatistic(); + // get all groups - Set groupIds = colocateIndex.getAllGroupIds(); for (GroupId groupId : groupIds) { Map statisticMap = env.getTabletScheduler().getStatisticMap(); if (statisticMap == null) { @@ -182,13 +440,15 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { // try relocate or balance this group for specified tag List> balancedBackendsPerBucketSeq = Lists.newArrayList(); if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex, - infoService, statistic, balancedBackendsPerBucketSeq)) { + infoService, statistic, globalColocateStatistic, balancedBackendsPerBucketSeq, + balanceBetweenGroups)) { if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq, - replicaAlloc)) { + replicaAlloc)) { LOG.warn("relocate group {} succ, but replica allocation has change, old replica alloc {}", groupId, replicaAlloc); continue; } + changeGroups.add(groupId); Map>> balancedBackendsPerBucketSeqMap = Maps.newHashMap(); balancedBackendsPerBucketSeqMap.put(tag, balancedBackendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo @@ -199,6 +459,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { } } } + + return changeGroups; } /* @@ -206,7 +468,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { * replicas, and mark that group as unstable. * If every replicas match the backends in group, mark that group as stable. */ - private void matchGroup() { + private void matchGroups() { long start = System.currentTimeMillis(); CheckerCounter counter = new CheckerCounter(); @@ -315,6 +577,83 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { counter.tabletInScheduler, counter.tabletNotReady, cost); } + private GlobalColocateStatistic buildGlobalColocateStatistic() { + Env env = Env.getCurrentEnv(); + ColocateTableIndex colocateIndex = env.getColocateTableIndex(); + GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic(); + + Set groupIds = colocateIndex.getAllGroupIds(); + for (GroupId groupId : groupIds) { + ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId); + if (groupSchema == null) { + continue; + } + ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc(); + List tableIds = colocateIndex.getAllTableIds(groupId); + List> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId); + if (backendBucketsSeq.isEmpty()) { + continue; + } + + int totalReplicaNumPerBucket = 0; + ArrayList totalReplicaDataSizes = Lists.newArrayList(); + for (int i = 0; i < backendBucketsSeq.size(); i++) { + totalReplicaDataSizes.add(0L); + } + + for (Long tableId : tableIds) { + long dbId = groupId.dbId; + if (dbId == 0) { + dbId = groupId.getDbIdByTblId(tableId); + } + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); + if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) { + continue; + } + + olapTable.readLock(); + try { + for (Partition partition : olapTable.getPartitions()) { + short replicationNum = replicaAlloc.getTotalReplicaNum(); + + // Here we only get VISIBLE indexes. All other indexes are not queryable. + // So it does not matter if tablets of other indexes are not matched. + + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + Preconditions.checkState(backendBucketsSeq.size() == index.getTablets().size(), + backendBucketsSeq.size() + " vs. " + index.getTablets().size()); + int tabletOrderIdx = 0; + totalReplicaNumPerBucket++; + for (Long tabletId : index.getTabletIdsInOrder()) { + Set bucketsSeq = backendBucketsSeq.get(tabletOrderIdx); + Preconditions.checkState(bucketsSeq.size() == replicationNum, + bucketsSeq.size() + " vs. " + replicationNum); + Tablet tablet = index.getTablet(tabletId); + totalReplicaDataSizes.set(tabletOrderIdx, + totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true)); + tabletOrderIdx++; + } + } + } + } catch (Exception e) { + LOG.warn("build group {} colocate statistic error", groupId, e); + continue; + } finally { + olapTable.readUnlock(); + } + } + + globalColocateStatistic.addGroup(groupId, replicaAlloc, backendBucketsSeq, totalReplicaDataSizes, + totalReplicaNumPerBucket); + } + + return globalColocateStatistic; + } + /* * Each balance is performed for a single workload group in a colocate group. * For example, if the replica allocation of a colocate group is {TagA: 2, TagB: 1}, @@ -373,8 +712,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { * Return false if nothing changed. */ private boolean relocateAndBalance(GroupId groupId, Tag tag, Set unavailableBeIds, List availableBeIds, - ColocateTableIndex colocateIndex, SystemInfoService infoService, - LoadStatisticForTag statistic, List> balancedBackendsPerBucketSeq) { + ColocateTableIndex colocateIndex, SystemInfoService infoService, LoadStatisticForTag statistic, + GlobalColocateStatistic globalColocateStatistic, List> balancedBackendsPerBucketSeq, + boolean balanceBetweenGroups) { ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId); short replicaNum = groupSchema.getReplicaAlloc().getReplicaNumByTag(tag); List> backendsPerBucketSeq = Lists.newArrayList( @@ -383,7 +723,16 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { List flatBackendsPerBucketSeq = backendsPerBucketSeq.stream() .flatMap(List::stream).collect(Collectors.toList()); + int tagTotalBucketNum = globalColocateStatistic.getAllTagBucketNum().getOrDefault(tag, 0); + int availableBeNum = availableBeIds.size(); + int highTotalBucketNumPerBe = availableBeNum == 0 ? 0 : + (tagTotalBucketNum + availableBeNum - 1) / availableBeNum; + int lowTotalBucketNumPerBe = availableBeNum == 0 ? 0 : tagTotalBucketNum / availableBeNum; + boolean isChanged = false; + int times = 0; + List resultPaths = Lists.newArrayList(); + OUT: while (true) { // update backends and hosts at each round @@ -394,27 +743,34 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { return false; } Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size()); + times++; + if (times > 10 * backendsPerBucketSeq.size()) { + LOG.warn("iterate too many times for relocate group: {}, times: {}, bucket num: {}", + groupId, times, backendsPerBucketSeq.size()); + break; + } long srcBeId = -1; List seqIndexes = null; - boolean hasUnavailableBe = false; + boolean srcBeUnavailable = false; // first choose the unavailable be as src be for (Long beId : unavailableBeIds) { seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId); if (!seqIndexes.isEmpty()) { srcBeId = beId; - hasUnavailableBe = true; + srcBeUnavailable = true; LOG.info("find unavailable backend {} in colocate group: {}", beId, groupId); break; } } + // sort backends with replica num in desc order List> backendWithReplicaNum = - getSortedBackendReplicaNumPairs(availableBeIds, - unavailableBeIds, statistic, flatBackendsPerBucketSeq); + getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, + globalColocateStatistic, flatBackendsPerBucketSeq); // if there is only one available backend and no unavailable bucketId to relocate, end the outer loop - if (backendWithReplicaNum.size() <= 1 && !hasUnavailableBe) { + if (backendWithReplicaNum.size() <= 1 && !srcBeUnavailable) { break; } @@ -430,12 +786,40 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { // we try to use a low backend to replace the src backend. // if replace failed(eg: both backends are on some host), select next low backend and try(j--) Map.Entry lowBackend = backendWithReplicaNum.get(j); - if ((!hasUnavailableBe) && (seqIndexes.size() - lowBackend.getValue()) <= 1) { - // balanced - break OUT; + long destBeId = lowBackend.getKey(); + if (!srcBeUnavailable) { + long diffThisGroup = seqIndexes.size() - lowBackend.getValue(); + if (diffThisGroup < 1) { + // balanced + break OUT; + } + + // src's group bucket num = dest's group bucket num + 1 + // if move group bucket from src to dest, dest will be one more group num than src. + // check global view + // + // suppose bucket num = 3, three BE A/B/C, two group group1/group2, then we have: + // + // A [ group1:bucket0, group2:bucket0] + // B [ group1:bucket1, group2:bucket1] + // C [ group1:bucket2, group2:bucket2] + // + // if we add a new BE D, for each group: bucketNum(A)=bucketNum(B)=bucketNum(C)=1, bucketNum(D)=0 + // so each group is balance, but in global groups view, it's not balance. + // we should move one of the buckets to D + if (diffThisGroup == 1) { + if (!balanceBetweenGroups) { + break OUT; + } + int srcTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId); + int destTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId); + if (srcTotalBucketNum <= highTotalBucketNumPerBe + || destTotalBucketNum >= lowTotalBucketNumPerBe) { + continue; + } + } } - long destBeId = lowBackend.getKey(); Backend destBe = infoService.getBackend(destBeId); if (destBe == null) { LOG.info("backend {} does not exist", destBeId); @@ -458,6 +842,14 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { continue; } + BackendLoadStatistic beStat = statistic.getBackendLoadStatistic(destBeId); + if (beStat == null) { + LOG.warn("not found backend {} statistic", destBeId); + continue; + } + + int targetSeqIndex = -1; + long minDataSizeDiff = Long.MAX_VALUE; for (int seqIndex : seqIndexes) { // the bucket index. // eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0. @@ -465,26 +857,62 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { List backendsSet = backendsPerBucketSeq.get(bucketIndex); List hostsSet = hostsPerBucketSeq.get(bucketIndex); // the replicas of a tablet can not locate in same Backend or same host - if (!backendsSet.contains(destBeId) && !hostsSet.contains(destBe.getHost())) { - Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId); - flatBackendsPerBucketSeq.set(seqIndex, destBeId); - LOG.info("replace backend {} with backend {} in colocate group {}, idx: {}", - srcBeId, destBeId, groupId, seqIndex); - // just replace one backend at a time, src and dest BE id should be recalculated because - // flatBackendsPerBucketSeq is changed. - isChanged = true; - isThisRoundChanged = true; - break; + if (backendsSet.contains(destBeId) || hostsSet.contains(destBe.getHost())) { + continue; + } + + Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId); + long bucketDataSize = + globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex); + + resultPaths.clear(); + BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, true); + if (!st.ok()) { + LOG.debug("backend {} is unable to fit in group {}, tablet order idx {}, data size {}", + destBeId, groupId, bucketIndex, bucketDataSize); + continue; + } + + long newSrcBeTotalReplicaDataSize = globalColocateStatistic.getBackendTotalReplicaDataSize(srcBeId) + - bucketDataSize; + long newDestBeTotalReplicaDataSize = + globalColocateStatistic.getBackendTotalReplicaDataSize(destBeId) + bucketDataSize; + long dataSizeDiff = Math.abs(newSrcBeTotalReplicaDataSize - newDestBeTotalReplicaDataSize); + if (targetSeqIndex < 0 || dataSizeDiff < minDataSizeDiff) { + targetSeqIndex = seqIndex; + minDataSizeDiff = dataSizeDiff; } } - if (isThisRoundChanged) { - // we found a change - break; + if (targetSeqIndex < 0) { + // we use next node as dst node + LOG.info("unable to replace backend {} with backend {} in colocate group {}", + srcBeId, destBeId, groupId); + continue; } - // we use next node as dst node - LOG.info("unable to replace backend {} with backend {} in colocate group {}", - srcBeId, destBeId, groupId); + + int tabletOrderIdx = targetSeqIndex / replicaNum; + int oldSrcThisGroup = seqIndexes.size(); + long oldDestThisGroup = lowBackend.getValue(); + int oldSrcBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId); + int oldDestBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId); + LOG.debug("OneMove: group {}, src {}, this group {}, all group {}, dest {}, this group {}, " + + "all group {}", groupId, srcBeId, oldSrcThisGroup, oldSrcBucketNum, destBeId, + oldDestThisGroup, oldDestBucketNum); + Preconditions.checkState( + globalColocateStatistic.moveTablet(groupId, tabletOrderIdx, srcBeId, destBeId)); + Preconditions.checkState(oldSrcBucketNum - 1 + == globalColocateStatistic.getBackendTotalBucketNum(srcBeId)); + Preconditions.checkState(oldDestBucketNum + 1 + == globalColocateStatistic.getBackendTotalBucketNum(destBeId)); + flatBackendsPerBucketSeq.set(targetSeqIndex, destBeId); + // just replace one backend at a time, src and dest BE id should be recalculated because + // flatBackendsPerBucketSeq is changed. + isChanged = true; + isThisRoundChanged = true; + LOG.info("replace backend {} with backend {} in colocate group {}, idx: {}", + srcBeId, destBeId, groupId, targetSeqIndex); + break; } if (!isThisRoundChanged) { @@ -642,7 +1070,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { } private List> getSortedBackendReplicaNumPairs(List allAvailBackendIds, - Set unavailBackendIds, LoadStatisticForTag statistic, List flatBackendsPerBucketSeq) { + Set unavailBackendIds, LoadStatisticForTag statistic, + GlobalColocateStatistic globalColocateStatistic, + List flatBackendsPerBucketSeq) { // backend id -> replica num, and sorted by replica num, descending. Map backendToReplicaNum = flatBackendsPerBucketSeq.stream() .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); @@ -664,20 +1094,27 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { if (!entry1.getValue().equals(entry2.getValue())) { return (int) (entry2.getValue() - entry1.getValue()); } + + // From java 7, sorting needs to satisfy reflexivity, transitivity and symmetry. + // Otherwise it will raise exception "Comparison method violates its general contract". + BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey()); BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey()); if (beStat1 == null || beStat2 == null) { - return 0; + if (beStat1 == null && beStat2 == null) { + return 0; + } else { + return beStat1 == null ? 1 : -1; + } } double loadScore1 = beStat1.getMixLoadScore(); double loadScore2 = beStat2.getMixLoadScore(); - if (Math.abs(loadScore1 - loadScore2) < 1e-6) { - return 0; - } else if (loadScore2 > loadScore1) { - return 1; - } else { - return -1; + int cmp = Double.compare(loadScore2, loadScore1); + if (cmp != 0) { + return cmp; } + + return Long.compare(entry1.getKey(), entry2.getKey()); }) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 510f039333..a550a9ee07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1138,7 +1138,7 @@ public class TabletScheduler extends MasterDaemon { // it will also delete replica from tablet inverted index. tabletCtx.deleteReplica(replica); - if (force) { + if (force || FeConstants.runningUnitTest) { // send the delete replica task. // also, this may not be necessary, but delete it will make things simpler. // NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 98b5912936..a05c63b812 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -330,4 +330,48 @@ public class CatalogTestUtil { backend.setAlive(true); return backend; } + + public static long getTabletDataSize(long tabletId) { + Env env = Env.getCurrentEnv(); + TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex(); + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (tabletMeta == null) { + return -1L; + } + + long dbId = tabletMeta.getDbId(); + long tableId = tabletMeta.getTableId(); + long partitionId = tabletMeta.getPartitionId(); + long indexId = tabletMeta.getIndexId(); + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + return -1L; + } + Table table = db.getTableNullable(tableId); + if (table == null) { + return -1L; + } + if (table.getType() != Table.TableType.OLAP) { + return -1L; + } + OlapTable olapTable = (OlapTable) table; + olapTable.readLock(); + try { + Partition partition = olapTable.getPartition(partitionId); + if (partition == null) { + return -1L; + } + MaterializedIndex materializedIndex = partition.getIndex(indexId); + if (materializedIndex == null) { + return -1L; + } + Tablet tablet = materializedIndex.getTablet(tabletId); + if (tablet == null) { + return -1L; + } + return tablet.getDataSize(true); + } finally { + olapTable.readUnlock(); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java new file mode 100644 index 0000000000..596a06f668 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Replica; +import org.apache.doris.system.Backend; + +import com.google.common.collect.Maps; +import com.google.common.collect.Table; + +import java.util.List; +import java.util.Map; + +public class BalanceStatistic { + public Map backendTotalDataSize; + public Map backendTotalReplicaNum; + + private BalanceStatistic(Map backendTotalDataSize, + Map backendTotalReplicaNum) { + this.backendTotalDataSize = backendTotalDataSize; + this.backendTotalReplicaNum = backendTotalReplicaNum; + } + + public static BalanceStatistic getCurrentBalanceStatistic() { + Map backendTotalDataSize = Maps.newHashMap(); + Map backendTotalReplicaNum = Maps.newHashMap(); + List backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + backends.forEach(be -> { + backendTotalDataSize.put(be.getId(), 0L); + backendTotalReplicaNum.put(be.getId(), 0); + }); + + Table replicaMetaTable = + Env.getCurrentInvertedIndex().getReplicaMetaTable(); + for (Table.Cell cell : replicaMetaTable.cellSet()) { + long beId = cell.getColumnKey(); + Replica replica = cell.getValue(); + backendTotalDataSize.put(beId, backendTotalDataSize.get(beId) + replica.getDataSize()); + backendTotalReplicaNum.put(beId, backendTotalReplicaNum.get(beId) + 1); + } + + return new BalanceStatistic(backendTotalDataSize, backendTotalReplicaNum); + } + + public Map getBackendTotalDataSize() { + return backendTotalDataSize; + } + + public Map getBackendTotalReplicaNum() { + return backendTotalReplicaNum; + } + + public long getBeMinTotalDataSize() { + return backendTotalDataSize.values().stream().min(Long::compare).get(); + } + + public long getBeMaxTotalDataSize() { + return backendTotalDataSize.values().stream().max(Long::compare).get(); + } + + public int getBeMinTotalReplicaNum() { + return backendTotalReplicaNum.values().stream().min(Integer::compare).get(); + } + + public int getBeMaxTotalReplicaNum() { + return backendTotalReplicaNum.values().stream().max(Integer::compare).get(); + } + + public void printToStdout() { + int minTotalReplicaNum = getBeMinTotalReplicaNum(); + int maxTotalReplicaNum = getBeMaxTotalReplicaNum(); + long minTotalDataSize = getBeMinTotalDataSize(); + long maxTotalDataSize = getBeMaxTotalDataSize(); + + System.out.println(""); + System.out.println("=== backend min total replica num: " + minTotalReplicaNum); + System.out.println("=== backend max total replica num: " + maxTotalReplicaNum); + System.out.println("=== max / min : " + (maxTotalReplicaNum / (double) minTotalReplicaNum)); + + System.out.println(""); + System.out.println("=== min total data size: " + minTotalDataSize); + System.out.println("=== max total data size: " + maxTotalDataSize); + System.out.println("=== max / min : " + (maxTotalDataSize / (double) minTotalDataSize)); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java new file mode 100644 index 0000000000..526de17ce0 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.DdlExecutor; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.UtFrameUtils; + +import com.google.common.collect.Maps; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +public class ColocateTableCheckerAndBalancerPerfTest { + private static String runningDir = "fe/mocked/ColocateTableCheckerAndBalancerPerfTest/" + + UUID.randomUUID().toString() + "/"; + + private static ConnectContext connectContext; + private static final int TEMP_DISALBE_BE_NUM = 2; + private static List backends; + + @BeforeClass + public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; + FeConstants.enableInternalSchemaDb = false; + Config.tablet_checker_interval_ms = 100; + Config.tablet_schedule_interval_ms = 100; + Config.enable_round_robin_create_tablet = false; + Config.disable_balance = true; + Config.schedule_batch_size = 400; + Config.schedule_slot_num_per_hdd_path = 1000; + Config.disable_colocate_balance = true; + Config.disable_tablet_scheduler = true; + UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 6); + + backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + for (Backend be : backends) { + for (DiskInfo diskInfo : be.getDisks().values()) { + diskInfo.setTotalCapacityB(10L << 40); + diskInfo.setDataUsedCapacityB(1L); + diskInfo.setAvailableCapacityB( + diskInfo.getTotalCapacityB() - diskInfo.getDataUsedCapacityB()); + } + } + Map tagMap = Maps.newHashMap(); + tagMap.put(Tag.TYPE_LOCATION, "zone_a"); + for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) { + backends.get(i).setTagMap(tagMap); + } + + // create connect context + connectContext = UtFrameUtils.createDefaultCtx(); + } + + @AfterClass + public static void tearDown() { + UtFrameUtils.cleanDorisFeDir(runningDir); + } + + @Test + public void testRelocateAndBalance() throws Exception { + + Env env = Env.getCurrentEnv(); + String createDbStmtStr = "create database test;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext); + DdlExecutor.execute(env, createDbStmt); + + Random random = new Random(); + final int groupNum = 100; + for (int groupIndex = 0; groupIndex <= groupNum; groupIndex++) { + int tableNum = 1 + random.nextInt(10); + for (int tableIndex = 0; tableIndex < tableNum; tableIndex++) { + String sql = String.format("CREATE TABLE test.table_%s_%s\n" + + "( k1 int, k2 int, v1 int )\n" + + "ENGINE=OLAP\n" + + "UNIQUE KEY (k1,k2)\n" + + "DISTRIBUTED BY HASH(k2) BUCKETS 11\n" + + "PROPERTIES('colocate_with' = 'group_%s');", + groupIndex, tableIndex, groupIndex); + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + try { + DdlExecutor.execute(env, createTableStmt); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + + BalanceStatistic beforeBalanceStatistic = BalanceStatistic.getCurrentBalanceStatistic(); + Assert.assertEquals("group: " + groupIndex + ", table: " + tableIndex + ", " + + beforeBalanceStatistic.getBackendTotalReplicaNum(), + 0, beforeBalanceStatistic.getBeMinTotalReplicaNum()); + } + } + + ColocateTableIndex colocateIndex = env.getColocateTableIndex(); + Set groupIds = colocateIndex.getAllGroupIds(); + + RebalancerTestUtil.updateReplicaDataSize(100L << 10, 10, 10); + RebalancerTestUtil.updateReplicaPathHash(); + + BalanceStatistic beforeBalanceStatistic = BalanceStatistic.getCurrentBalanceStatistic(); + Assert.assertEquals("" + beforeBalanceStatistic.getBackendTotalReplicaNum(), + 0, beforeBalanceStatistic.getBeMinTotalReplicaNum()); + + // all groups stable + Thread.sleep(1000); + Assert.assertTrue("some groups are unstable", + groupIds.stream().noneMatch(groupId -> colocateIndex.isGroupUnstable(groupId))); + + // after enable colocate balance and some backends return, it should relocate all groups. + // and they will be unstable + Map tagMap = backends.get(TEMP_DISALBE_BE_NUM).getTagMap(); + for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) { + backends.get(i).setTagMap(tagMap); + } + Config.disable_colocate_balance = false; + Thread.sleep(1000); + Assert.assertTrue("some groups are stable", + groupIds.stream().allMatch(groupId -> colocateIndex.isGroupUnstable(groupId))); + + + // after enable scheduler, the unstable groups should shed their tablets and change to stable + Config.disable_tablet_scheduler = false; + for (int i = 0; true; i++) { + Thread.sleep(1000); + + boolean allStable = groupIds.stream().noneMatch( + groupId -> colocateIndex.isGroupUnstable(groupId)); + + if (allStable) { + break; + } + + Assert.assertTrue("some groups are unstable", i < 60); + } + + System.out.println("=== before colocate relocate and balance:"); + beforeBalanceStatistic.printToStdout(); + Assert.assertEquals("" + beforeBalanceStatistic.getBackendTotalReplicaNum(), + 0, beforeBalanceStatistic.getBeMinTotalReplicaNum()); + Assert.assertEquals("" + beforeBalanceStatistic.getBackendTotalDataSize(), + 0, beforeBalanceStatistic.getBeMinTotalDataSize()); + long beforeDataSizeDiff = beforeBalanceStatistic.getBeMaxTotalDataSize() + - beforeBalanceStatistic.getBeMinTotalDataSize(); + int beforeReplicaNumDiff = beforeBalanceStatistic.getBeMaxTotalReplicaNum() + - beforeBalanceStatistic.getBeMinTotalReplicaNum(); + + BalanceStatistic afterBalanceStatistic = BalanceStatistic.getCurrentBalanceStatistic(); + System.out.println(""); + System.out.println("=== after colocate relocate and balance:"); + afterBalanceStatistic.printToStdout(); + + Assert.assertTrue("" + afterBalanceStatistic.getBackendTotalReplicaNum(), + afterBalanceStatistic.getBeMinTotalReplicaNum() > 0); + Assert.assertTrue("" + afterBalanceStatistic.getBackendTotalDataSize(), + afterBalanceStatistic.getBeMinTotalDataSize() > 0); + long afterDataSizeDiff = afterBalanceStatistic.getBeMaxTotalDataSize() + - afterBalanceStatistic.getBeMinTotalDataSize(); + int afterReplicaNumDiff = afterBalanceStatistic.getBeMaxTotalReplicaNum() + - afterBalanceStatistic.getBeMinTotalReplicaNum(); + Assert.assertTrue(afterDataSizeDiff <= beforeDataSizeDiff); + Assert.assertTrue(afterReplicaNumDiff <= beforeReplicaNumDiff); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java index b159dee523..ee33862c95 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java @@ -24,12 +24,16 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BackendBuckets; +import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BucketStatistic; +import org.apache.doris.clone.ColocateTableCheckerAndBalancer.GlobalColocateStatistic; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -106,6 +110,20 @@ public class ColocateTableCheckerAndBalancerTest { return colocateTableIndex; } + private GlobalColocateStatistic createGlobalColocateStatistic(ColocateTableIndex colocateTableIndex, + GroupId groupId) { + GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic(); + List> backendsPerBucketSeq = colocateTableIndex.getBackendsPerBucketSeqSet(groupId); + List totalReplicaDataSizes = Lists.newArrayList(); + for (int i = 0; i < backendsPerBucketSeq.size(); i++) { + totalReplicaDataSizes.add(1L); + } + ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) backendsPerBucketSeq.get(0).size()); + globalColocateStatistic.addGroup(groupId, replicaAlloc, backendsPerBucketSeq, totalReplicaDataSizes, 3); + + return globalColocateStatistic; + } + @Test public void testBalance(@Mocked SystemInfoService infoService, @Mocked LoadStatisticForTag statistic) { @@ -140,7 +158,11 @@ public class ColocateTableCheckerAndBalancerTest { minTimes = 0; statistic.getBackendLoadStatistic(anyLong); - result = null; + result = new Delegate() { + BackendLoadStatistic delegate(Long beId) { + return new FakeBackendLoadStatistic(beId, null, null); + } + }; minTimes = 0; } }; @@ -158,24 +180,28 @@ public class ColocateTableCheckerAndBalancerTest { Lists.newArrayList(1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L)); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); + GlobalColocateStatistic globalColocateStatistic = createGlobalColocateStatistic(colocateTableIndex, groupId); List> balancedBackendsPerBucketSeq = Lists.newArrayList(); List allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG, new HashSet(), allAvailBackendIds, - colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); + colocateTableIndex, infoService, statistic, globalColocateStatistic, + balancedBackendsPerBucketSeq, false); List> expected = Lists.partition( - Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 4L, 1L, 2L, 3L), 3); - Assert.assertTrue(changed); + Lists.newArrayList(8L, 5L, 6L, 5L, 6L, 7L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L), 3); + Assert.assertTrue("" + globalColocateStatistic, changed); Assert.assertEquals(expected, balancedBackendsPerBucketSeq); // 2. balance a already balanced group colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L)); + globalColocateStatistic = createGlobalColocateStatistic(colocateTableIndex, groupId); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); balancedBackendsPerBucketSeq.clear(); changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG, new HashSet(), allAvailBackendIds, - colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); + colocateTableIndex, infoService, statistic, globalColocateStatistic, + balancedBackendsPerBucketSeq, false); System.out.println(balancedBackendsPerBucketSeq); Assert.assertFalse(changed); Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty()); @@ -214,7 +240,11 @@ public class ColocateTableCheckerAndBalancerTest { result = backend9; minTimes = 0; statistic.getBackendLoadStatistic(anyLong); - result = null; + result = new Delegate() { + BackendLoadStatistic delegate(Long beId) { + return new FakeBackendLoadStatistic(beId, null, null); + } + }; minTimes = 0; } }; @@ -229,24 +259,28 @@ public class ColocateTableCheckerAndBalancerTest { // 1. only one available backend // [[7], [7], [7], [7], [7]] ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L)); + GlobalColocateStatistic globalColocateStatistic = createGlobalColocateStatistic(colocateTableIndex, groupId); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); List> balancedBackendsPerBucketSeq = Lists.newArrayList(); List allAvailBackendIds = Lists.newArrayList(7L); boolean changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG, - new HashSet(), allAvailBackendIds, colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); + new HashSet(), allAvailBackendIds, colocateTableIndex, infoService, statistic, globalColocateStatistic, + balancedBackendsPerBucketSeq, false); Assert.assertFalse(changed); // 2. all backends are checked but this round is not changed // [[7], [7], [7], [7], [7]] // and add new backends 8, 9 that are on the same host with 7 colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L)); + globalColocateStatistic = createGlobalColocateStatistic(colocateTableIndex, groupId); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); balancedBackendsPerBucketSeq = Lists.newArrayList(); allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L); changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG, - new HashSet(), allAvailBackendIds, colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); + new HashSet(), allAvailBackendIds, colocateTableIndex, infoService, statistic, globalColocateStatistic, + balancedBackendsPerBucketSeq, false); Assert.assertFalse(changed); } @@ -271,13 +305,15 @@ public class ColocateTableCheckerAndBalancerTest { group2Schema.put(groupId, groupSchema); ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L)); + GlobalColocateStatistic globalColocateStatistic = createGlobalColocateStatistic(colocateTableIndex, groupId); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); List> balancedBackendsPerBucketSeq = Lists.newArrayList(); Set unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); List availBackendIds = Lists.newArrayList(); boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG, - unAvailBackendIds, availBackendIds, colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); + unAvailBackendIds, availBackendIds, colocateTableIndex, infoService, statistic, globalColocateStatistic, + balancedBackendsPerBucketSeq, false); Assert.assertFalse(changed); } @@ -295,19 +331,20 @@ public class ColocateTableCheckerAndBalancerTest { } }; + GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic(); // all buckets are on different be List allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); Set unavailBackendIds = Sets.newHashSet(9L); List flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); List> backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", - allAvailBackendIds, unavailBackendIds, statistic, flatBackendsPerBucketSeq); + allAvailBackendIds, unavailBackendIds, statistic, globalColocateStatistic, flatBackendsPerBucketSeq); long[] backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray(); Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, backendIds); // 0,1 bucket on same be and 5, 6 on same be flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 7L, 7L, 9L); backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds, - statistic, flatBackendsPerBucketSeq); + statistic, globalColocateStatistic, flatBackendsPerBucketSeq); backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray(); Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, backendIds); } @@ -322,6 +359,12 @@ public class ColocateTableCheckerAndBalancerTest { public double getMixLoadScore() { return mixLoadScores.get(getBeId()); } + + @Override + public BalanceStatus isFit(long tabletSize, TStorageMedium medium, List result, + boolean isSupplement) { + return BalanceStatus.OK; + } } @Test @@ -599,4 +642,51 @@ public class ColocateTableCheckerAndBalancerTest { System.out.println(availableBeIds); Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i -> i).sorted().toArray()); } + + @Test + public void testGlobalColocateStatistic() { + GroupId groupId1 = new GroupId(1L, 10000L); + GroupId groupId2 = new GroupId(2L, 20000L); + GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic(); + globalColocateStatistic.addGroup(groupId1, new ReplicaAllocation((short) 2), + Lists.newArrayList(Sets.newHashSet(1001L, 1002L), Sets.newHashSet(1002L, 1003L), + Sets.newHashSet(1001L, 1003L)), + Lists.newArrayList(100L, 200L, 300L), 5); + globalColocateStatistic.addGroup(groupId2, new ReplicaAllocation((short) 1), + Lists.newArrayList(Sets.newHashSet(1001L), Sets.newHashSet(1002L), + Sets.newHashSet(1003L), Sets.newHashSet(1001L)), + Lists.newArrayList(100L, 200L, 300L, 400L), 7); + + Map backendBucketsMap = globalColocateStatistic.getBackendBucketsMap(); + BackendBuckets backendBuckets1 = backendBucketsMap.get(1001L); + Assert.assertNotNull(backendBuckets1); + Assert.assertEquals(Lists.newArrayList(0, 2), + backendBuckets1.getGroupTabletOrderIndices().get(groupId1)); + Assert.assertEquals(Lists.newArrayList(0, 3), + backendBuckets1.getGroupTabletOrderIndices().get(groupId2)); + BackendBuckets backendBuckets2 = backendBucketsMap.get(1002L); + Assert.assertNotNull(backendBuckets2); + Assert.assertEquals(Lists.newArrayList(0, 1), + backendBuckets2.getGroupTabletOrderIndices().get(groupId1)); + Assert.assertEquals(Lists.newArrayList(1), + backendBuckets2.getGroupTabletOrderIndices().get(groupId2)); + BackendBuckets backendBuckets3 = backendBucketsMap.get(1003L); + Assert.assertNotNull(backendBuckets3); + Assert.assertEquals(Lists.newArrayList(1, 2), + backendBuckets3.getGroupTabletOrderIndices().get(groupId1)); + Assert.assertEquals(Lists.newArrayList(2), + backendBuckets3.getGroupTabletOrderIndices().get(groupId2)); + + Map> allGroupBucketsMap = globalColocateStatistic.getAllGroupBucketsMap(); + Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 5, 100L), new BucketStatistic(1, 5, 200L), + new BucketStatistic(2, 5, 300L)), + allGroupBucketsMap.get(groupId1)); + Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 7, 100L), new BucketStatistic(1, 7, 200L), + new BucketStatistic(2, 7, 300L), new BucketStatistic(3, 7, 400L)), + allGroupBucketsMap.get(groupId2)); + + Map expectAllTagBucketNum = Maps.newHashMap(); + expectAllTagBucketNum.put(Tag.DEFAULT_BACKEND_TAG, 10); + Assert.assertEquals(expectAllTagBucketNum, globalColocateStatistic.getAllTagBucketNum()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 95f71d0b51..da03d42a64 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; @@ -37,6 +38,7 @@ import com.google.common.collect.Table; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.stream.IntStream; public class RebalancerTestUtil { @@ -110,7 +112,8 @@ public class RebalancerTestUtil { } public static void updateReplicaPathHash() { - Table replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable(); + Table replicaMetaTable = + Env.getCurrentInvertedIndex().getReplicaMetaTable(); for (Table.Cell cell : replicaMetaTable.cellSet()) { long beId = cell.getColumnKey(); Backend be = Env.getCurrentSystemInfo().getBackend(beId); @@ -129,4 +132,47 @@ public class RebalancerTestUtil { } } + + public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, int tabletSkew) { + Random random = new Random(); + tableSkew = Math.max(tableSkew, 1); + tabletSkew = Math.max(tabletSkew, 1); + Env env = Env.getCurrentEnv(); + List dbIds = env.getInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + + if (db.isMysqlCompatibleDatabase()) { + continue; + } + + for (org.apache.doris.catalog.Table table : db.getTables()) { + long tableBaseSize = minReplicaSize * (1 + random.nextInt(tableSkew)); + table.readLock(); + try { + if (table.getType() != TableType.OLAP) { + continue; + } + + OlapTable tbl = (OlapTable) table; + for (Partition partition : tbl.getAllPartitions()) { + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + for (Tablet tablet : idx.getTablets()) { + long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew)); + for (Replica replica : tablet.getReplicas()) { + replica.updateStat(tabletSize, 1000L); + } + } + } + } + } finally { + table.readUnlock(); + } + } + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 55abcf9542..d3f5aeeacc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -17,11 +17,14 @@ package org.apache.doris.utframe; +import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.common.ClientPool; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.PBackendServiceGrpc; import org.apache.doris.proto.Types; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.HeartbeatService; @@ -35,6 +38,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TCheckStorageFormatResult; import org.apache.doris.thrift.TCloneReq; import org.apache.doris.thrift.TDiskTrashInfo; +import org.apache.doris.thrift.TDropTabletReq; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportState; @@ -45,6 +49,7 @@ import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TMasterInfo; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -129,11 +134,16 @@ public class MockedBackendFactory { // User can extends this abstract class to create other custom be thrift service public abstract static class BeThriftService implements BackendService.Iface { protected MockedBackend backend; + protected Backend backendInFe; public void setBackend(MockedBackend backend) { this.backend = backend; } + public void setBackendInFe(Backend backendInFe) { + this.backendInFe = backendInFe; + } + public abstract void init(); } @@ -156,11 +166,15 @@ public class MockedBackendFactory { @Override public void run() { while (true) { + boolean ok = false; + FrontendService.Client client = null; + TNetworkAddress address = null; try { + address = backend.getFeAddress(); TAgentTaskRequest request = taskQueue.take(); System.out.println( "get agent task request. type: " + request.getTaskType() + ", signature: " - + request.getSignature() + ", fe addr: " + backend.getFeAddress()); + + request.getSignature() + ", fe addr: " + address); TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(tBackend, request.getTaskType(), request.getSignature(), new TStatus(TStatusCode.OK)); TTaskType taskType = request.getTaskType(); @@ -169,35 +183,79 @@ public class MockedBackendFactory { case ALTER: ++reportVersion; break; + case DROP: + handleDropTablet(request, finishTaskRequest); + break; case CLONE: - handleClone(request, finishTaskRequest); + handleCloneTablet(request, finishTaskRequest); break; default: break; } finishTaskRequest.setReportVersion(reportVersion); - FrontendService.Client client = - ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000); - System.out.println("get fe " + backend.getFeAddress() + " client: " + client); + client = ClientPool.frontendPool.borrowObject(address, 2000); client.finishTask(finishTaskRequest); + ok = true; } catch (Exception e) { e.printStackTrace(); + } finally { + if (ok) { + ClientPool.frontendPool.returnObject(address, client); + } else { + ClientPool.frontendPool.invalidateObject(address, client); + } } } } - private void handleClone(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + TDropTabletReq req = request.getDropTabletReq(); + long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); + DiskInfo diskInfo = getDisk(-1); + if (diskInfo != null) { + diskInfo.setDataUsedCapacityB(Math.max(0L, + diskInfo.getDataUsedCapacityB() - dataSize)); + diskInfo.setAvailableCapacityB(Math.min(diskInfo.getTotalCapacityB(), + diskInfo.getAvailableCapacityB() + dataSize)); + } + } + + private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { TCloneReq req = request.getCloneReq(); + long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); + long pathHash = req.dest_path_hash; + DiskInfo diskInfo = getDisk(pathHash); + if (diskInfo != null) { + pathHash = diskInfo.getPathHash(); + diskInfo.setDataUsedCapacityB(Math.min(diskInfo.getTotalCapacityB(), + diskInfo.getDataUsedCapacityB() + dataSize)); + diskInfo.setAvailableCapacityB(Math.max(0L, + diskInfo.getAvailableCapacityB() - dataSize)); + } + List tabletInfos = Lists.newArrayList(); TTabletInfo tabletInfo = new TTabletInfo(req.tablet_id, req.schema_hash, req.committed_version, - req.committed_version_hash, 1, 1); + req.committed_version_hash, 1, dataSize); tabletInfo.setStorageMedium(req.storage_medium); - tabletInfo.setPathHash(req.dest_path_hash); + tabletInfo.setPathHash(pathHash); tabletInfo.setUsed(true); tabletInfos.add(tabletInfo); finishTaskRequest.setFinishTabletInfos(tabletInfos); } + + private DiskInfo getDisk(long pathHash) { + DiskInfo diskInfo = null; + for (DiskInfo tmpDiskInfo : backendInFe.getDisks().values()) { + diskInfo = tmpDiskInfo; + if (diskInfo.getPathHash() == pathHash + || pathHash == -1L || pathHash == 0) { + break; + } + } + + return diskInfo; + } }).start(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 3aa3a464c7..5237909dd4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -454,9 +454,10 @@ public abstract class TestWithFeService { int beArrowFlightSqlPort = findValidPort(); // start be + MockedBackendFactory.BeThriftService beThriftService = new DefaultBeThriftServiceImpl(); MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + beThriftService, new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -467,6 +468,7 @@ public abstract class TestWithFeService { diskInfo1.setTotalCapacityB(1000000); diskInfo1.setAvailableCapacityB(500000); diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setPathHash(be.getId()); Map disks = Maps.newHashMap(); disks.put(diskInfo1.getRootPath(), diskInfo1); be.setDisks(ImmutableMap.copyOf(disks)); @@ -475,6 +477,7 @@ public abstract class TestWithFeService { be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); be.setArrowFlightSqlPort(beArrowFlightSqlPort); + beThriftService.setBackendInFe(be); Env.getCurrentSystemInfo().addBackend(be); return be; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 8bb8581fd8..db3bc4dadb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -287,9 +287,10 @@ public class UtFrameUtils { int beArrowFlightSqlPort = findValidPort(); // start be + MockedBackendFactory.BeThriftService beThriftService = new DefaultBeThriftServiceImpl(); MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + beThriftService, new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -308,7 +309,9 @@ public class UtFrameUtils { be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); be.setArrowFlightSqlPort(beArrowFlightSqlPort); + beThriftService.setBackendInFe(be); Env.getCurrentSystemInfo().addBackend(be); + return be; }