From ffc6596ceffd1a2bfff37529b6e49d1e2419c236 Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 26 Dec 2023 17:36:05 +0800 Subject: [PATCH] [refactor](create tablet) default create tablet round robin (#28911) --- .../java/org/apache/doris/common/Config.java | 8 +- .../org/apache/doris/backup/RestoreJob.java | 3 +- .../org/apache/doris/catalog/OlapTable.java | 3 +- .../common/util/DynamicPartitionUtil.java | 13 +- .../doris/common/util/PropertyAnalyzer.java | 3 +- .../doris/datasource/InternalCatalog.java | 40 +--- .../doris/system/BeSelectionPolicy.java | 15 ++ .../doris/system/SystemInfoService.java | 211 ++++++++---------- .../apache/doris/backup/RestoreJobTest.java | 10 +- .../doris/catalog/ReplicaAllocationTest.java | 3 +- .../load/sync/canal/CanalSyncDataTest.java | 2 +- .../doris/system/SystemInfoServiceTest.java | 2 +- 12 files changed, 150 insertions(+), 163 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 61a135ed96..4c92449cb5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2075,7 +2075,13 @@ public class Config extends ConfigBase { public static boolean skip_localhost_auth_check = true; @ConfField(mutable = true) - public static boolean enable_round_robin_create_tablet = false; + public static boolean enable_round_robin_create_tablet = true; + + @ConfField(mutable = true, masterOnly = true, description = { + "创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡", + "When creating tablet of a partition, always start from the first BE. " + + "Note: This method may cause BE imbalance"}) + public static boolean create_tablet_round_robin_from_start = false; /** * To prevent different types (V1, V2, V3) of behavioral inconsistencies, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6d4f68e985..23ca4a335f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1113,6 +1113,7 @@ public class RestoreJob extends AbstractJob { long visibleVersion = remotePart.getVisibleVersion(); // tablets + Map nextIndexs = Maps.newHashMap(); for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) { int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); @@ -1127,7 +1128,7 @@ public class RestoreJob extends AbstractJob { // replicas try { Map> beIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false); + .selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false); for (Map.Entry> entry : beIds.entrySet()) { for (Long beId : entry.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0ce2df5e6f..d12dc9512b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -589,6 +589,7 @@ public class OlapTable extends Table { } // for each partition, reset rollup index map + Map nextIndexs = Maps.newHashMap(); for (Map.Entry entry : idToPartition.entrySet()) { Partition partition = entry.getValue(); // entry.getKey() is the new partition id, use it to get the restore specified replica allocation @@ -616,7 +617,7 @@ public class OlapTable extends Table { try { Map> tag2beIds = Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, false); + replicaAlloc, nextIndexs, null, false, false); for (Map.Entry> entry3 : tag2beIds.entrySet()) { for (Long beId : entry3.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index de6101181f..ff2c943305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -40,10 +40,12 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import com.google.common.collect.Range; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +230,8 @@ public class DynamicPartitionUtil { ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val); } ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val)); - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, Maps.newHashMap(), + null, false, true); } private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum, @@ -237,14 +240,16 @@ public class DynamicPartitionUtil { ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO); } - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Map nextIndexs = Maps.newHashMap(); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, + false, true); if (hotPartitionNum <= 0) { return; } try { - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false, - true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, + TStorageMedium.SSD, false, true); } catch (DdlException e) { throw new DdlException("Failed to find enough backend for ssd storage medium. When setting " + DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store " diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index ca0310769e..c8c0978ca7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1069,6 +1069,7 @@ public class PropertyAnalyzer { allocationVal = allocationVal.replaceAll(" ", ""); String[] locations = allocationVal.split(","); int totalReplicaNum = 0; + Map nextIndexs = Maps.newHashMap(); for (String location : locations) { String[] parts = location.split(":"); if (parts.length != 2) { @@ -1092,7 +1093,7 @@ public class PropertyAnalyzer { try { SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); systemInfoService.selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, true); + replicaAlloc, nextIndexs, null, false, true); } catch (DdlException ddlException) { throw new AnalysisException(ddlException.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index dea018b642..2e031faede 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2743,6 +2743,7 @@ public class InternalCatalog implements CatalogIf { Set tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); Map>> backendsPerBucketSeq = null; GroupId groupId = null; if (colocateIndex.isColocateTable(tabletMeta.getTableId())) { @@ -2762,16 +2763,18 @@ public class InternalCatalog implements CatalogIf { backendsPerBucketSeq = Maps.newHashMap(); } + TStorageMedium storageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium(); Map nextIndexs = new HashMap<>(); - if (Config.enable_round_robin_create_tablet) { - for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { - int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), - tabletMeta.getStorageMedium()); - if (startPos == -1) { - throw new DdlException("The number of BEs that match the policy is insufficient"); + for (Tag tag : replicaAlloc.getAllocMap().keySet()) { + int startPos = -1; + if (Config.create_tablet_round_robin_from_start) { + startPos = 0; + } else { + startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium, + isStorageMediumSpecified); } - nextIndexs.put(entry.getKey(), startPos); + nextIndexs.put(tag, startPos); } } @@ -2788,27 +2791,8 @@ public class InternalCatalog implements CatalogIf { if (chooseBackendsArbitrary) { // This is the first colocate table in the group, or just a normal table, // choose backends - if (Config.enable_round_robin_create_tablet) { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - nextIndexs); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, null, - nextIndexs); - } - } else { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - isStorageMediumSpecified, false); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, - isStorageMediumSpecified, false); - } - } + chosenBackendIds = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, + storageMedium, isStorageMediumSpecified, false); for (Map.Entry> entry : chosenBackendIds.entrySet()) { backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 3a711307bd..ace2ab3e1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -51,6 +51,11 @@ public class BeSelectionPolicy { public boolean preferComputeNode = false; public int expectBeNum = 0; + public boolean enableRoundRobin = false; + // if enable round robin, choose next be from nextRoundRobinIndex + // call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex + public int nextRoundRobinIndex = -1; + public List preferredLocations = new ArrayList<>(); private BeSelectionPolicy() { @@ -114,6 +119,16 @@ public class BeSelectionPolicy { return this; } + public Builder setEnableRoundRobin(boolean enableRoundRobin) { + policy.enableRoundRobin = enableRoundRobin; + return this; + } + + public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) { + policy.nextRoundRobinIndex = nextRoundRobinIndex; + return this; + } + public BeSelectionPolicy build() { return policy; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 3ccb6d6345..9c1e196923 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -54,6 +54,7 @@ import org.jetbrains.annotations.NotNull; import java.io.DataInputStream; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -415,86 +416,42 @@ public class SystemInfoService { return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList()); } - class BeComparator implements Comparator { + class BeIdComparator implements Comparator { public int compare(Backend a, Backend b) { return (int) (a.getId() - b.getId()); } } - public List selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number, - int nextIndex) { - Preconditions.checkArgument(number >= -1); - List candidates = getCandidates(policy); - if (number != -1 && candidates.size() < number) { - LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); - return Lists.newArrayList(); + class BeHostComparator implements Comparator { + public int compare(Backend a, Backend b) { + return a.getHost().compareTo(b.getHost()); } - - int realIndex = nextIndex % candidates.size(); - List partialOrderList = new ArrayList(); - partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - partialOrderList.addAll(candidates.subList(0, realIndex) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - - if (number == -1) { - return partialOrderList; - } else { - return partialOrderList.subList(0, number); - } - } - - public List getCandidates(BeSelectionPolicy policy) { - List candidates = policy.getCandidateBackends(idToBackendRef.values()); - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); - } - - if (!policy.allowOnSameHost) { - Map> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); - } - } - candidates.clear(); - for (List list : backendMaps.values()) { - candidates.add(list.get(0)); - } - } - - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); - } - - Collections.sort(candidates, new BeComparator()); - return candidates; } // Select the smallest number of tablets as the starting position of // round robin in the BE that match the policy - public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) { + public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium, boolean isStorageMediumSpecified) { BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() - .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag)) + .needScheduleAvailable() + .needCheckDiskUsage() + .addTags(Sets.newHashSet(tag)) .setStorageMedium(storageMedium); if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { builder.allowOnSameHost(); } BeSelectionPolicy policy = builder.build(); - List candidates = getCandidates(policy); + List beIds = selectBackendIdsByPolicy(policy, -1); + if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified) { + storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; + policy = builder.setStorageMedium(storageMedium).build(); + beIds = selectBackendIdsByPolicy(policy, -1); + } long minBeTabletsNum = Long.MAX_VALUE; int minIndex = -1; - for (int i = 0; i < candidates.size(); ++i) { - long tabletsNum = Env.getCurrentInvertedIndex() - .getTabletIdsByBackendId(candidates.get(i).getId()).size(); + for (int i = 0; i < beIds.size(); ++i) { + long tabletsNum = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beIds.get(i)).size(); if (tabletsNum < minBeTabletsNum) { minBeTabletsNum = tabletsNum; minIndex = i; @@ -503,40 +460,12 @@ public class SystemInfoService { return minIndex; } - public Map> getBeIdRoundRobinForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, - Map nextIndexs) throws DdlException { - Map> chosenBackendIds = Maps.newHashMap(); - Map allocMap = replicaAlloc.getAllocMap(); - short totalReplicaNum = 0; - for (Map.Entry entry : allocMap.entrySet()) { - BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() - .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) - .setStorageMedium(storageMedium); - if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { - builder.allowOnSameHost(); - } - - BeSelectionPolicy policy = builder.build(); - int nextIndex = nextIndexs.get(entry.getKey()); - List beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex); - nextIndexs.put(entry.getKey(), nextIndex + beIds.size()); - - if (beIds.isEmpty()) { - throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy); - } - chosenBackendIds.put(entry.getKey(), beIds); - totalReplicaNum += beIds.size(); - } - Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum()); - return chosenBackendIds; - } - /** * Select a set of backends for replica creation. * The following parameters need to be considered when selecting backends. * * @param replicaAlloc + * @param nextIndexs create tablet round robin next be index, when enable_round_robin_create_tablet * @param storageMedium * @param isStorageMediumSpecified * @param isOnlyForCheck set true if only used for check available backend @@ -544,7 +473,8 @@ public class SystemInfoService { * @throws DdlException */ public Map> selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, boolean isStorageMediumSpecified, + ReplicaAllocation replicaAlloc, Map nextIndexs, + TStorageMedium storageMedium, boolean isStorageMediumSpecified, boolean isOnlyForCheck) throws DdlException { Map copiedBackends = Maps.newHashMap(idToBackendRef); @@ -561,12 +491,17 @@ public class SystemInfoService { List failedEntries = Lists.newArrayList(); for (Map.Entry entry : allocMap.entrySet()) { + Tag tag = entry.getKey(); BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) .setStorageMedium(storageMedium); if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { builder.allowOnSameHost(); } + if (Config.enable_round_robin_create_tablet) { + builder.setEnableRoundRobin(true); + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } BeSelectionPolicy policy = builder.build(); List beIds = selectBackendIdsByPolicy(policy, entry.getValue()); @@ -574,9 +509,16 @@ public class SystemInfoService { // if only for check, no need to retry different storage medium to get backend if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) { storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; - policy = builder.setStorageMedium(storageMedium).build(); + builder.setStorageMedium(storageMedium); + if (Config.enable_round_robin_create_tablet) { + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } + policy = builder.build(); beIds = selectBackendIdsByPolicy(policy, entry.getValue()); } + if (Config.enable_round_robin_create_tablet) { + nextIndexs.put(tag, policy.nextRoundRobinIndex); + } // after retry different storage medium, it's still empty if (beIds.isEmpty()) { LOG.error("failed backend(s) for policy:" + policy); @@ -605,7 +547,7 @@ public class SystemInfoService { /** * Select a set of backends by the given policy. * - * @param policy + * @param policy if policy is enableRoundRobin, will update its nextRoundRobinIndex * @param number number of backends which need to be selected. -1 means return as many as possible. * @return return #number of backend ids, * or empty set if no backends match the policy, or the number of matched backends is less than "number"; @@ -613,50 +555,77 @@ public class SystemInfoService { public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { Preconditions.checkArgument(number >= -1); List candidates = policy.getCandidateBackends(idToBackendRef.values()); - if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) { + if (candidates.size() < number || candidates.isEmpty()) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } + // If only need one Backend, just return a random one. - if (number == 1) { + if (number == 1 && !policy.enableRoundRobin) { Collections.shuffle(candidates); return Lists.newArrayList(candidates.get(0).getId()); } - if (policy.allowOnSameHost) { - Collections.shuffle(candidates); - if (number == -1) { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); - } else { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + boolean hasSameHost = false; + if (!policy.allowOnSameHost) { + // for each host, random select one backend. + Map> backendMaps = Maps.newHashMap(); + for (Backend backend : candidates) { + if (backendMaps.containsKey(backend.getHost())) { + backendMaps.get(backend.getHost()).add(backend); + } else { + List list = Lists.newArrayList(); + list.add(backend); + backendMaps.put(backend.getHost(), list); + } + } + + candidates.clear(); + for (List list : backendMaps.values()) { + if (list.size() > 1) { + Collections.shuffle(list); + hasSameHost = true; + } + candidates.add(list.get(0)); } } - // for each host, random select one backend. - Map> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); - } - } - candidates.clear(); - for (List list : backendMaps.values()) { - Collections.shuffle(list); - candidates.add(list.get(0)); - } - if (number != -1 && candidates.size() < number) { + if (candidates.size() < number) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } - Collections.shuffle(candidates); - if (number != -1) { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + + if (policy.enableRoundRobin) { + if (!policy.allowOnSameHost && hasSameHost) { + // not allow same host and has same host, + // then we compare them with their host + Collections.sort(candidates, new BeHostComparator()); + } else { + Collections.sort(candidates, new BeIdComparator()); + } + + if (policy.nextRoundRobinIndex < 0) { + policy.nextRoundRobinIndex = new SecureRandom().nextInt(candidates.size()); + } + + int realIndex = policy.nextRoundRobinIndex % candidates.size(); + List partialOrderList = new ArrayList(); + partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) + .stream().map(Backend::getId).collect(Collectors.toList())); + partialOrderList.addAll(candidates.subList(0, realIndex) + .stream().map(Backend::getId).collect(Collectors.toList())); + + List result = number == -1 ? partialOrderList : partialOrderList.subList(0, number); + policy.nextRoundRobinIndex = realIndex + result.size(); + + return result; } else { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + Collections.shuffle(candidates); + if (number != -1) { + return candidates.subList(0, number).stream().map(Backend::getId).collect(Collectors.toList()); + } else { + return candidates.stream().map(Backend::getId).collect(Collectors.toList()); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 64e92b35c8..d361777fdd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -39,6 +39,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; +import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -54,6 +55,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; @@ -153,12 +155,14 @@ public class RestoreJobTest { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, - false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { public synchronized List selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) { + ReplicaAllocation replicaAlloc, Map nextIndexs, + TStorageMedium medium, boolean isStorageMediumSpecified, + boolean isOnlyForCheck) { List beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java index 14367ea731..c53715cd81 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java @@ -52,7 +52,8 @@ public class ReplicaAllocationTest { public void setUp() throws DdlException { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, Maps.newHashMap(), + (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { Map> selectBackendIdsForReplicaCreation() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index bf57f21f02..61228c821a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -150,7 +150,7 @@ public class CanalSyncDataTest { result = execPlanFragmentParams; systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, - (TStorageMedium) any, false, true); + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = backendIds; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 7c5556e8cf..22e12b37da 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -404,7 +404,7 @@ public class SystemInfoServiceTest { Map beCounterMap = Maps.newHashMap(); for (int i = 0; i < 10000; ++i) { Map> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc, - TStorageMedium.HDD, false, false); + Maps.newHashMap(), TStorageMedium.HDD, false, false); Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size()); for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) { beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);