From ae18cebe0bd04bffacf95cad6cb31be2b17ea3cf Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Sat, 11 May 2019 21:49:51 +0800 Subject: [PATCH] Improve colocate table balance logic for backend added (#1139) 1. Improve colocate table balance logic for backend added 2. Add more comment 3. Break loop early --- .../doris/clone/ColocateTableBalancer.java | 322 ++++++++++-------- .../clone/ColocateTableBalancerTest.java | 105 ++++++ 2 files changed, 287 insertions(+), 140 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java diff --git a/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java index 21640b4d98..d14ba2fd50 100644 --- a/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java @@ -18,6 +18,8 @@ package org.apache.doris.clone; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; @@ -43,9 +45,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * when backend remove, down, and add, balance colocate tablets @@ -70,13 +76,18 @@ public class ColocateTableBalancer extends Daemon { @Override /** - * firstly, try marking balancing group to stable if the balance has finished - * secondly, try deleting redundant replicas for colocate group balance finished just now - * we delay to delete redundant replicas until all clone job is done. - * thirdly, try balancing colocate group if we found backend removed, down or added. + * The colocate table balance flow: + * + * 1 balance start when found backend removed, down or added + * 2 compute which bucket seq need to migrate, the migrate source backend, the migrate target backend + * 3 mark colocate group balancing in colocate meta + * 4 update colcate backendsPerBucketSeq meta + * 5 do real data migration by clone job + * 6 delete redundant replicas after all clone job done + * 7 mark colocate group stable in colocate meta and balance done */ protected void runOneCycle() { - tryMarkBalancingGroupStable(); + checkAndCloneBalancingGroup(); tryDeleteRedundantReplicas(); @@ -86,8 +97,9 @@ public class ColocateTableBalancer extends Daemon { /** * check all balancing colocate group tables * if all tables in a colocate group are stable, mark the colocate group stable + * else add a clone job for balancing colocate group tables */ - private synchronized void tryMarkBalancingGroupStable() { + private synchronized void checkAndCloneBalancingGroup() { ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); Catalog catalog = Catalog.getInstance(); @@ -109,7 +121,7 @@ public class ColocateTableBalancer extends Daemon { List allTableIds = colocateIndex.getAllTableIds(groupId); for (long tableId : allTableIds) { OlapTable olapTable = (OlapTable) db.getTable(tableId); - if (checkTableBalancing(db, olapTable, colocateIndex.getBackendsPerBucketSeq(groupId))) { + if (checkAndCloneTable(db, olapTable, colocateIndex.getBackendsPerBucketSeq(groupId))) { isBalancing = true; break; } @@ -130,11 +142,10 @@ public class ColocateTableBalancer extends Daemon { * a: all replica state is not clone * b: the tablet backendIds are consistent with ColocateTableIndex's backendsPerBucketSeq * - * 2 if colocate table is balancing , we will try adding a clone job - * handle the FE restart when colocate groups balancing case: - * After FE restart, the clone job meta will lose + * 2 if colocate table is balancing , we will try adding a clone job. + * clone.addCloneJob has duplicated check, so we could try many times */ - private boolean checkTableBalancing(Database db, OlapTable olapTable, List> backendsPerBucketSeq) { + private boolean checkAndCloneTable(Database db, OlapTable olapTable, List> backendsPerBucketSeq) { boolean isBalancing = false; out: for (Partition partition : olapTable.getPartitions()) { short replicateNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); @@ -142,7 +153,7 @@ public class ColocateTableBalancer extends Daemon { List tablets = index.getTablets(); for (int i = 0; i < tablets.size(); i++) { Tablet tablet = tablets.get(i); - //1 check all replica state is not clone + // 1 check all replica state is not clone for (Replica replica : tablet.getReplicas()) { if (replica.getState().equals(Replica.ReplicaState.CLONE)) { isBalancing = true; @@ -153,26 +164,26 @@ public class ColocateTableBalancer extends Daemon { List groupBackends = new ArrayList<>(backendsPerBucketSeq.get(i)); Set tabletBackends = tablet.getBackendIds(); - //2 check the tablet backendIds are consistent with ColocateTableIndex's backendsPerBucketSeq + // 2 check the tablet backendIds are consistent with ColocateTableIndex's backendsPerBucketSeq if (!tabletBackends.containsAll(groupBackends)) { isBalancing = true; LOG.info("colocate group : {} is still balancing, may be clone job hasn't run, try adding a clone job", olapTable.getColocateTable()); - //try adding a clone job - //clone.addCloneJob has duplicated check, so there isn't side-effect + // try adding a clone job + // clone.addCloneJob has duplicated check, so there isn't side-effect List clusterAliveBackendIds = getAliveClusterBackendIds(db.getClusterName()); groupBackends.removeAll(tabletBackends); - //for backend added; + // for backend added; if (clusterAliveBackendIds.containsAll(tabletBackends)) { - //we can ignore tabletSizeB parameter here + // we can ignore tabletSizeB parameter here CloneTabletInfo tabletInfo = new CloneTabletInfo(db.getId(), olapTable.getId(), partition.getId(), index.getId(), tablet.getId(), replicateNum, replicateNum, 0, tabletBackends); for (Long cloneBackend : groupBackends) { AddMigrationJob(tabletInfo, cloneBackend); } - } else { //for backend down or removed + } else { // for backend down or removed short onlineReplicaNum = (short) (replicateNum - groupBackends.size()); CloneTabletInfo tabletInfo = new CloneTabletInfo(db.getId(), olapTable.getId(), partition.getId(), index.getId(), tablet.getId(), replicateNum, onlineReplicaNum, 0, tabletBackends); @@ -182,9 +193,9 @@ public class ColocateTableBalancer extends Daemon { } } } - } //end tablet - } //end index - } //end partition + } // end tablet + } // end index + } // end partition return isBalancing; } @@ -208,12 +219,12 @@ public class ColocateTableBalancer extends Daemon { Set allGroupBackendIds = colocateIndex.getBackendsByGroup(groupId); List> backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId); - //1 check backend removed or down + // 1 check backend removed or down if (!clusterAliveBackendIds.containsAll(allGroupBackendIds)) { Set removedBackendIds = Sets.newHashSet(allGroupBackendIds); removedBackendIds.removeAll(clusterAliveBackendIds); - //A backend in Colocate group but not alive, which means the backend is removed or down + // A backend in Colocate group but not alive, which means the backend is removed or down Iterator removedBackendIdsIterator = removedBackendIds.iterator(); while (removedBackendIdsIterator.hasNext()) { Long removedBackendId = (Long) removedBackendIdsIterator.next(); @@ -226,19 +237,19 @@ public class ColocateTableBalancer extends Daemon { if (!removedBackendIds.isEmpty()) { LOG.info("removedBackendIds {} for colocate group {}", removedBackendIds, groupId); - //multiple backend removed is unusual, so we handle one by one + // multiple backend removed is unusual, so we handle one by one for (Long backendId : removedBackendIds) { balanceForBackendRemoved(db, groupId, backendId); } - continue; //for one colocate group, only handle backend removed or added event once + continue; // for one colocate group, only handle backend removed or added event once } } //2 check backend added int replicateNum = backendsPerBucketSeq.get(0).size(); if (backendsPerBucketSeq.size() * replicateNum <= allGroupBackendIds.size()) { - //if each tablet replica has a different backend, which means the colocate group - //has fully balanced. we can ignore the new backend added. + // if each tablet replica has a different backend, which means the colocate group + // has fully balanced. we can ignore the new backend added. LOG.info("colocate group {} has already fully balanced. skip", groupId); continue; } @@ -254,7 +265,7 @@ public class ColocateTableBalancer extends Daemon { } } - //get the backends: 1 belong to this cluster; 2 alive; 3 not decommissioned + // get the backends: 1 belong to this cluster; 2 alive; 3 not decommissioned private List getAliveClusterBackendIds(String clusterName) { SystemInfoService systemInfo = Catalog.getCurrentSystemInfo(); List clusterBackendIds = systemInfo.getClusterBackendIds(clusterName, true); @@ -270,7 +281,7 @@ public class ColocateTableBalancer extends Daemon { Set allGroupIds = colocateIndex.getAllGroupIds(); for (Long groupId : allGroupIds) { Set balancingGroups = colocateIndex.getBalancingGroupIds(); - //only delete reduntdant replica when group is stable + // only delete redundant replica when group is stable if (!balancingGroups.contains(groupId)) { Database db = catalog.getDb(colocateIndex.getDB(groupId)); List allTableIds = colocateIndex.getAllTableIds(groupId); @@ -303,11 +314,9 @@ public class ColocateTableBalancer extends Daemon { if (deleteTabletSet.size() > 0) { LOG.info("colocate group {} will delete tablet {}", groupId, deleteTabletSet); - //delete tablet will affect colocate table local query schedule, - //so make colocate group balancing again - colocateIndex.markGroupBalancing(groupId); - ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId); - Catalog.getInstance().getEditLog().logColocateMarkBalancing(info); + // delete tablet will affect colocate table local query schedule, + // so make colocate group balancing again + markGroupBalancing(groupId); for (CloneTabletInfo tabletInfo : deleteTabletSet) { deleteRedundantReplicas(db, tabletInfo); } @@ -331,7 +340,7 @@ public class ColocateTableBalancer extends Daemon { Tablet tablet = index.getTablet(tabletId); List replicas = tablet.getReplicas(); - //delete replica for backend removed + // delete replica for backend removed List copyReplicas = new ArrayList<>(replicas); for (Replica replica : copyReplicas) { long backendId = replica.getBackendId(); @@ -340,7 +349,7 @@ public class ColocateTableBalancer extends Daemon { } } - //delete replica for backend added + // delete replica for backend added List updatedReplicas = tablet.getReplicas(); short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); if (updatedReplicas.size() <= replicationNum) { @@ -350,7 +359,7 @@ public class ColocateTableBalancer extends Daemon { int deleteNum = updatedReplicas.size() - replicationNum; List sortedReplicaIds = sortReplicaId(updatedReplicas); - //always delete replica which id is minimum + // always delete replica which id is minimum for (int i = 0; i < deleteNum; i++) { Replica deleteReplica = tablet.getReplicaById(sortedReplicaIds.get(i)); deleteReplica(tablet, deleteReplica, db.getId(), tableId, partitionId, indexId); @@ -382,11 +391,9 @@ public class ColocateTableBalancer extends Daemon { } /** - * 1 compute need delete BucketSeqs in the removedBackend - * 2 select clone replica BackendId for the new Replica - * 3 mark colocate group balancing - * 4 add a Supplement Job - * 5 update the TableColocateIndex backendsPerBucketSeq metadata + * 1 compute which bucket seq need to migrate, the migrate target backend + * 2 mark colocate group balancing in colocate meta + * 3 update colcate backendsPerBucketSeq meta */ private void balanceForBackendRemoved(Database db, Long groupId, Long removedBackendId) { ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); @@ -431,28 +438,21 @@ public class ColocateTableBalancer extends Daemon { long cloneReplicaBackendId = selectCloneBackendIdForRemove(newGroup2BackendsPerBucketSeq, groupId, i, db.getClusterName(), tabletInfo); if (cloneReplicaBackendId != -1L) { - if (!colocateIndex.isGroupBalancing(groupId)) { - colocateIndex.markGroupBalancing(groupId); - ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId); - Catalog.getInstance().getEditLog().logColocateMarkBalancing(info); - } + markGroupBalancing(groupId); - //update TableColocateIndex groupBucket2BEs List backends = newBackendsPerBucketSeq.get(i); backends.remove(removedBackendId); if (!backends.contains(cloneReplicaBackendId)) { backends.add(cloneReplicaBackendId); } - AddSupplementJob(tabletInfo, cloneReplicaBackendId); + Preconditions.checkState(replicateNum == backends.size(), replicateNum + " vs. " + backends.size()); } } } } } - colocateIndex.addBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); - ColocatePersistInfo info = ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); - Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info); + persistBackendsToBucketSeqMeta(groupId, newBackendsPerBucketSeq); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { @@ -468,7 +468,7 @@ public class ColocateTableBalancer extends Daemon { return newBackendsPerBucketSeq; } - //this logic is like CloneChecker.checkTabletForSupplement and CloneChecker.addCloneJob + // this logic is like CloneChecker.checkTabletForSupplement and CloneChecker.addCloneJob private Long selectCloneBackendIdForRemove(com.google.common.collect.Table newGroup2BackendsPerBucketSeq, long group, int bucketSeq, String clusterName, CloneTabletInfo tabletInfo) { Long cloneReplicaBackendId = null; cloneReplicaBackendId = newGroup2BackendsPerBucketSeq.get(group, bucketSeq); @@ -507,104 +507,30 @@ public class ColocateTableBalancer extends Daemon { } /** - * balance after new backend added - * - * 1 compute the the number of bucket seqs need to move from the each old backend and - * the number of bucket seqs need to move to the each new backend - * 2 select the clone target Backend for the new Replica - * 3 mark colocate group balancing - * 4 add a Migration Job - * 5 update the ColocateTableIndex's backendsPerBucketSeq + * 1 compute which bucket seq need to migrate, the migrate source backend, the migrate target backend + * 2 mark colocate group balancing in colocate meta + * 3 update colcate backendsPerBucketSeq meta * * For example: - * There are 3 backend and 4 tablet, and replicateNum is 3. * - * the mapping from tablet to backend to is following: - * - * tablet1 : [1, 2, 3] - * tablet2 : [2, 1, 3] - * tablet3 : [3, 2, 1] - * tablet4 : [1, 2, 3] - * - * After Adding a new backend: - * - * the needMoveBucketSeqs = 4 * 3 / (3 + 1) = 3 - * the bucketSeqsPerNewBackend = 3 / 1 = 1 - * - * After balancing, the mapping from tablet to backend to is following: - * - * tablet1 : [4, 2, 3] - * tablet2 : [4, 1, 3] - * tablet3 : [4, 2, 1] - * tablet4 : [1, 2, 3] + * the old backendsPerBucketSeq is: + * [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]] + * + * after we add two new backends: [5, 6] + * + * the balanced backendsPerBucketSeq will become: + * [[5, 6, 3], [6, 1, 2], [5, 4, 1], [2, 3, 4], [1, 2, 3]] * */ private void balanceForBackendAdded(Long groupId, Database db, List addedBackendIds) { ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); - com.google.common.collect.Table newGroup2BackendsPerBucketSeq = HashBasedTable.create(); - List> backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId); - int replicateNum = backendsPerBucketSeq.get(0).size(); - Set allGroupBackendIds = colocateIndex.getBackendsByGroup(groupId); - - List> newBackendsPerBucketSeq = deepCopy(backendsPerBucketSeq); - - int needMoveBucketSeqs = backendsPerBucketSeq.size() * replicateNum / (allGroupBackendIds.size() + addedBackendIds.size()); - int bucketSeqsPerNewBackend = needMoveBucketSeqs / addedBackendIds.size(); - LOG.info("for colocate group {}, needMoveBucketSeqs : {} , bucketSeqPerNewBackend: {}", groupId, needMoveBucketSeqs, bucketSeqsPerNewBackend); db.readLock(); try { - List allTableIds = colocateIndex.getAllTableIds(groupId); - for (long tableId : allTableIds) { - OlapTable olapTable = (OlapTable) db.getTable(tableId); - for (Partition partition : olapTable.getPartitions()) { - replicateNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); - for (MaterializedIndex index : partition.getMaterializedIndices()) { - List tablets = index.getTablets(); - for (int i = 0; i < tablets.size() && i < needMoveBucketSeqs; i++) { - Tablet tablet = tablets.get(i); - List replicas = tablet.getReplicas(); - List sortedReplicaIds = sortReplicaId(replicas); - //always delete replica which id is minimum - Replica deleteReplica = tablet.getReplicaById(sortedReplicaIds.get(0)); - - long tabletSizeB = deleteReplica.getDataSize() * partition.getMaterializedIndices().size() - * olapTable.getPartitions().size() * allTableIds.size(); - CloneTabletInfo tabletInfo = new CloneTabletInfo(db.getId(), tableId, partition.getId(), - index.getId(), tablet.getId(), (short) replicateNum, (short) replicateNum, - tabletSizeB, tablet.getBackendIds()); - - Long cloneReplicaBackendId = newGroup2BackendsPerBucketSeq.get(groupId, i); - if (cloneReplicaBackendId == null) { - // select dest backend - cloneReplicaBackendId = addedBackendIds.get(i % addedBackendIds.size()); - newGroup2BackendsPerBucketSeq.put(groupId, i, cloneReplicaBackendId); - } - - if (!colocateIndex.isGroupBalancing(groupId)) { - colocateIndex.markGroupBalancing(groupId); - ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId); - Catalog.getInstance().getEditLog().logColocateMarkBalancing(info); - } - - //update ColocateTableIndex backendsPerBucketSeq - List backends = newBackendsPerBucketSeq.get(i); - backends.remove(deleteReplica.getBackendId()); - if (!backends.contains(cloneReplicaBackendId)) { - backends.add(cloneReplicaBackendId); - } - - Preconditions.checkState(replicateNum == backends.size(), replicateNum + " vs. " + backends.size()); - - AddMigrationJob(tabletInfo, cloneReplicaBackendId); - } - } - } - } - colocateIndex.addBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); - ColocatePersistInfo info = ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); - Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info); + List> newBackendsPerBucketSeq = balance(backendsPerBucketSeq, addedBackendIds); + markGroupBalancing(groupId); + persistBackendsToBucketSeqMeta(groupId, newBackendsPerBucketSeq); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { @@ -612,6 +538,122 @@ public class ColocateTableBalancer extends Daemon { } } + /** + * Returns a map that the key is backend id, the value is replica num in the backend + * The map will sort by replica num in descending order + * + * @param backends the backend id list + * @return a descending sorted map + */ + private static Map getBackendToReplicaNums(List backends) { + Map backendCounter = backends.stream() + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + return backendCounter.entrySet().stream().sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); + } + + /** + * balance the bucket seq according to the new backends + * + * the balance logic is simple: + * + * 1 compute the replica num in each old backend + * 2 compute the avg replica num after new backend added + * 3 migrate the replica from the old backend to new backend, + * we preferred select the old backend has more replica num + * + * @param backendsPerBucketSeq the mapping from bucket seq to backend + * @param newBackends the new backends need to balance + * @return the balanced mapping from bucket seq to backend + */ + public static List> balance(List> backendsPerBucketSeq, List newBackends) { + int replicaNum = backendsPerBucketSeq.get(0).size(); + // the replicateNum for a partition in a colocate group + int allReplicaNum = backendsPerBucketSeq.size() * replicaNum; + Set groupBackendSet = backendsPerBucketSeq.stream().flatMap(List::stream).collect(Collectors.toSet()); + List flatBackendsPerBucketSeq = backendsPerBucketSeq.stream().flatMap(List::stream).collect(Collectors.toList()); + Map backendToReplicaNums = getBackendToReplicaNums(flatBackendsPerBucketSeq); + + int allBackendSize = groupBackendSet.size() + newBackends.size(); + + // normally avgReplicaNum equal allReplicaNum / allBackendSize, + // but when allBackendSize larger than allReplicaNum: the avgReplicaNum should be 1, not 0. + int avgReplicaNum = Math.max(allReplicaNum / allBackendSize, 1); + + // normally needBalanceNum equal avgReplicaNum * newBackends num + // but when allBackendSize larger than allReplicaNum: we should ensure the old backend at least has one replica + // for example: when the allReplicaNum is 15, the old backend num is 4, the new backend num is 12 + // the needBalanceNum should be 15 - 4 = 11, not 12 + int needBalanceNum = Math.min(avgReplicaNum * newBackends.size(), allReplicaNum - groupBackendSet.size()); + + LOG.info("avg ReplicaNum: " + avgReplicaNum); + LOG.info("need BalanceNum: " + needBalanceNum); + + int hasBalancedNum = 0; + // keep which BucketSeq will migrate to the new target backend + Map> targetBackendsToBucketSeqs = Maps.newHashMap(); + while (hasBalancedNum < needBalanceNum) { + // in one loop, we only migrate newBackends.size() num BucketSeq, because the backendToReplicaNums will change + for(Map.Entry backendToReplicaNum: backendToReplicaNums.entrySet()) { + long sourceBackend = backendToReplicaNum.getKey(); + long sourceReplicaNum = backendToReplicaNum.getValue(); + + // new backend should not as sourceBackend, after one loop, new backend will add to backendToReplicaNums + if (newBackends.contains(sourceBackend)) { + continue; + } + + if (sourceReplicaNum > avgReplicaNum) { + Long targetBackend = newBackends.get (hasBalancedNum % newBackends.size()); + + List sourceIndexes = IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed() + .filter(i -> flatBackendsPerBucketSeq.get(i).equals(sourceBackend)) + .collect(Collectors.toList()); + + for(int sourceIndex: sourceIndexes) { + int sourceBucketSeq = sourceIndex / replicaNum; + + // for one bucket seq, all replica should in different Backend + List choseSourceBucketSeq = targetBackendsToBucketSeqs.getOrDefault(targetBackend, Lists.newArrayList()); + if (!choseSourceBucketSeq.contains(sourceBucketSeq)) { + flatBackendsPerBucketSeq.set(sourceIndex, targetBackend); + + choseSourceBucketSeq.add(sourceBucketSeq); + targetBackendsToBucketSeqs.put(targetBackend, choseSourceBucketSeq); + + hasBalancedNum++; + break; + } + } + } + + if (hasBalancedNum >= needBalanceNum) { + break; + } + } + // reorder because the replica num in each backend has changed + backendToReplicaNums = getBackendToReplicaNums(flatBackendsPerBucketSeq); + } + return Lists.partition(flatBackendsPerBucketSeq, replicaNum); + } + + private void markGroupBalancing(long groupId) { + if (!Catalog.getCurrentColocateIndex().isGroupBalancing(groupId)) { + Catalog.getCurrentColocateIndex().markGroupBalancing(groupId); + ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId); + Catalog.getInstance().getEditLog().logColocateMarkBalancing(info); + LOG.info("mark group {} balancing", groupId); + } + } + + private void persistBackendsToBucketSeqMeta(long groupId, List> newBackendsPerBucketSeq) { + Catalog.getCurrentColocateIndex().addBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); + ColocatePersistInfo info = ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, newBackendsPerBucketSeq); + Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info); + LOG.info("persist backendsPerBucketSeq {} for group {}", newBackendsPerBucketSeq, groupId); + } + // for backend down or removed private void AddSupplementJob(CloneTabletInfo tabletInfo, long cloneBackendId) { Clone clone = Catalog.getInstance().getCloneInstance(); diff --git a/fe/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java new file mode 100644 index 0000000000..139efcb60e --- /dev/null +++ b/fe/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class ColocateTableBalancerTest { + + private static final int replicateNum = 3; + // [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]] + private static final List> backendsPerBucketSeq = + Lists.partition(Lists.newArrayList(1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L), replicateNum); + + @Test + /* + * backends: [1,2,3,4] + * bucket num: 5 + * replicateNum: 3 + * new backends: [5] + */ + public void testBalanceNormalWithOneBackend() { + List newBackends = Lists.newArrayList(5L); + System.out.println("newBackends: " + newBackends); + + List> newBackendsPerBucketSeq = ColocateTableBalancer.balance(backendsPerBucketSeq, newBackends); + System.out.println("new backendsPerBucketSeq: " + newBackendsPerBucketSeq); + + List> expectBackendsPerBucketSeq = Lists.partition(Lists.newArrayList(5L, 2L, 3L, 4L, 1L, 5L, 5L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L), replicateNum); + Assert.assertEquals(expectBackendsPerBucketSeq, newBackendsPerBucketSeq); + } + + @Test + /* + * backends: [1,2,3,4] + * bucket num: 5 + * replicateNum: 3 + * new backends: [5,6] + */ + public void testBalanceNormalWithTwoBackend() { + List newBackends = Lists.newArrayList(5L, 6L); + System.out.println("newBackends: " + newBackends); + + List> newBackendsPerBucketSeq = ColocateTableBalancer.balance(backendsPerBucketSeq, newBackends); + System.out.println("new backendsPerBucketSeq: " + newBackendsPerBucketSeq); + + List> expectBackendsPerBucketSeq = Lists.partition(Lists.newArrayList(5L, 6L, 3L, 6L, 1L, 2L, 5L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L), replicateNum); + Assert.assertEquals(expectBackendsPerBucketSeq, newBackendsPerBucketSeq); + } + + @Test + /* + * backends: [1,2,3,4] + * bucket num: 5 + * replicateNum: 3 + * new backends: [5,6,7,8,9,10,11,12,13,14,15] + */ + public void testBalanceNormalWithManyBackendEqualReplicateNum() { + List newBackends = Lists.newArrayList(5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L); + System.out.println("newBackends: " + newBackends); + + List> newBackendsPerBucketSeq = ColocateTableBalancer.balance(backendsPerBucketSeq, newBackends); + System.out.println("new backendsPerBucketSeq: " + newBackendsPerBucketSeq); + + List> expectBackendsPerBucketSeq = Lists.partition(Lists.newArrayList(5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 4L, 1L, 2L, 3L), replicateNum); + Assert.assertEquals(expectBackendsPerBucketSeq, newBackendsPerBucketSeq); + } + + + @Test + /* + * backends: [1,2,3,4] + * bucket num: 5 + * replicateNum: 3 + * new backends: [5,6,7,8,9,10,11,12,13,14,15,16] + */ + public void testBalanceNormalWithManyBackendExceedReplicateNum() { + List newBackends = Lists.newArrayList(5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L); + System.out.println("newBackends: " + newBackends); + + List> newBackendsPerBucketSeq = ColocateTableBalancer.balance(backendsPerBucketSeq, newBackends); + System.out.println("new backendsPerBucketSeq: " + newBackendsPerBucketSeq); + + List> expectBackendsPerBucketSeq = Lists.partition(Lists.newArrayList(5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 4L, 1L, 2L, 3L), replicateNum); + Assert.assertEquals(expectBackendsPerBucketSeq, newBackendsPerBucketSeq); + } +}