[refactor](create tablet) default create tablet round robin (#28911)
This commit is contained in:
@ -2075,7 +2075,13 @@ public class Config extends ConfigBase {
|
||||
public static boolean skip_localhost_auth_check = true;
|
||||
|
||||
@ConfField(mutable = true)
|
||||
public static boolean enable_round_robin_create_tablet = false;
|
||||
public static boolean enable_round_robin_create_tablet = true;
|
||||
|
||||
@ConfField(mutable = true, masterOnly = true, description = {
|
||||
"创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡",
|
||||
"When creating tablet of a partition, always start from the first BE. "
|
||||
+ "Note: This method may cause BE imbalance"})
|
||||
public static boolean create_tablet_round_robin_from_start = false;
|
||||
|
||||
/**
|
||||
* To prevent different types (V1, V2, V3) of behavioral inconsistencies,
|
||||
|
||||
@ -1113,6 +1113,7 @@ public class RestoreJob extends AbstractJob {
|
||||
long visibleVersion = remotePart.getVisibleVersion();
|
||||
|
||||
// tablets
|
||||
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
|
||||
for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
|
||||
int remotetabletSize = remoteIdx.getTablets().size();
|
||||
@ -1127,7 +1128,7 @@ public class RestoreJob extends AbstractJob {
|
||||
// replicas
|
||||
try {
|
||||
Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false);
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false);
|
||||
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
|
||||
for (Long beId : entry.getValue()) {
|
||||
long newReplicaId = env.getNextId();
|
||||
|
||||
@ -589,6 +589,7 @@ public class OlapTable extends Table {
|
||||
}
|
||||
|
||||
// for each partition, reset rollup index map
|
||||
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
|
||||
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
|
||||
Partition partition = entry.getValue();
|
||||
// entry.getKey() is the new partition id, use it to get the restore specified replica allocation
|
||||
@ -616,7 +617,7 @@ public class OlapTable extends Table {
|
||||
try {
|
||||
Map<Tag, List<Long>> tag2beIds =
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
|
||||
replicaAlloc, null, false, false);
|
||||
replicaAlloc, nextIndexs, null, false, false);
|
||||
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
|
||||
for (Long beId : entry3.getValue()) {
|
||||
long newReplicaId = env.getNextId();
|
||||
|
||||
@ -40,10 +40,12 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -228,7 +230,8 @@ public class DynamicPartitionUtil {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val);
|
||||
}
|
||||
ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val));
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, Maps.newHashMap(),
|
||||
null, false, true);
|
||||
}
|
||||
|
||||
private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum,
|
||||
@ -237,14 +240,16 @@ public class DynamicPartitionUtil {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO);
|
||||
}
|
||||
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
|
||||
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null,
|
||||
false, true);
|
||||
if (hotPartitionNum <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false,
|
||||
true);
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
|
||||
TStorageMedium.SSD, false, true);
|
||||
} catch (DdlException e) {
|
||||
throw new DdlException("Failed to find enough backend for ssd storage medium. When setting "
|
||||
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store "
|
||||
|
||||
@ -1069,6 +1069,7 @@ public class PropertyAnalyzer {
|
||||
allocationVal = allocationVal.replaceAll(" ", "");
|
||||
String[] locations = allocationVal.split(",");
|
||||
int totalReplicaNum = 0;
|
||||
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
|
||||
for (String location : locations) {
|
||||
String[] parts = location.split(":");
|
||||
if (parts.length != 2) {
|
||||
@ -1092,7 +1093,7 @@ public class PropertyAnalyzer {
|
||||
try {
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
systemInfoService.selectBackendIdsForReplicaCreation(
|
||||
replicaAlloc, null, false, true);
|
||||
replicaAlloc, nextIndexs, null, false, true);
|
||||
} catch (DdlException ddlException) {
|
||||
throw new AnalysisException(ddlException.getMessage());
|
||||
}
|
||||
|
||||
@ -2743,6 +2743,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified)
|
||||
throws DdlException {
|
||||
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
|
||||
GroupId groupId = null;
|
||||
if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
|
||||
@ -2762,16 +2763,18 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
backendsPerBucketSeq = Maps.newHashMap();
|
||||
}
|
||||
|
||||
TStorageMedium storageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium();
|
||||
Map<Tag, Integer> nextIndexs = new HashMap<>();
|
||||
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
|
||||
int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(),
|
||||
tabletMeta.getStorageMedium());
|
||||
if (startPos == -1) {
|
||||
throw new DdlException("The number of BEs that match the policy is insufficient");
|
||||
for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
|
||||
int startPos = -1;
|
||||
if (Config.create_tablet_round_robin_from_start) {
|
||||
startPos = 0;
|
||||
} else {
|
||||
startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium,
|
||||
isStorageMediumSpecified);
|
||||
}
|
||||
nextIndexs.put(entry.getKey(), startPos);
|
||||
nextIndexs.put(tag, startPos);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2788,27 +2791,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
if (chooseBackendsArbitrary) {
|
||||
// This is the first colocate table in the group, or just a normal table,
|
||||
// choose backends
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
|
||||
nextIndexs);
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, null,
|
||||
nextIndexs);
|
||||
}
|
||||
} else {
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
|
||||
isStorageMediumSpecified, false);
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null,
|
||||
isStorageMediumSpecified, false);
|
||||
}
|
||||
}
|
||||
chosenBackendIds = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
|
||||
storageMedium, isStorageMediumSpecified, false);
|
||||
|
||||
for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
|
||||
backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
|
||||
|
||||
@ -51,6 +51,11 @@ public class BeSelectionPolicy {
|
||||
public boolean preferComputeNode = false;
|
||||
public int expectBeNum = 0;
|
||||
|
||||
public boolean enableRoundRobin = false;
|
||||
// if enable round robin, choose next be from nextRoundRobinIndex
|
||||
// call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex
|
||||
public int nextRoundRobinIndex = -1;
|
||||
|
||||
public List<String> preferredLocations = new ArrayList<>();
|
||||
|
||||
private BeSelectionPolicy() {
|
||||
@ -114,6 +119,16 @@ public class BeSelectionPolicy {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setEnableRoundRobin(boolean enableRoundRobin) {
|
||||
policy.enableRoundRobin = enableRoundRobin;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) {
|
||||
policy.nextRoundRobinIndex = nextRoundRobinIndex;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BeSelectionPolicy build() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
@ -54,6 +54,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
@ -415,86 +416,42 @@ public class SystemInfoService {
|
||||
return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
class BeComparator implements Comparator<Backend> {
|
||||
class BeIdComparator implements Comparator<Backend> {
|
||||
public int compare(Backend a, Backend b) {
|
||||
return (int) (a.getId() - b.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number,
|
||||
int nextIndex) {
|
||||
Preconditions.checkArgument(number >= -1);
|
||||
List<Backend> candidates = getCandidates(policy);
|
||||
if (number != -1 && candidates.size() < number) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
|
||||
return Lists.newArrayList();
|
||||
class BeHostComparator implements Comparator<Backend> {
|
||||
public int compare(Backend a, Backend b) {
|
||||
return a.getHost().compareTo(b.getHost());
|
||||
}
|
||||
|
||||
int realIndex = nextIndex % candidates.size();
|
||||
List<Long> partialOrderList = new ArrayList<Long>();
|
||||
partialOrderList.addAll(candidates.subList(realIndex, candidates.size())
|
||||
.stream().map(b -> b.getId()).collect(Collectors.toList()));
|
||||
partialOrderList.addAll(candidates.subList(0, realIndex)
|
||||
.stream().map(b -> b.getId()).collect(Collectors.toList()));
|
||||
|
||||
if (number == -1) {
|
||||
return partialOrderList;
|
||||
} else {
|
||||
return partialOrderList.subList(0, number);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Backend> getCandidates(BeSelectionPolicy policy) {
|
||||
List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values());
|
||||
if (candidates.isEmpty()) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size());
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
if (!policy.allowOnSameHost) {
|
||||
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
|
||||
for (Backend backend : candidates) {
|
||||
if (backendMaps.containsKey(backend.getHost())) {
|
||||
backendMaps.get(backend.getHost()).add(backend);
|
||||
} else {
|
||||
List<Backend> list = Lists.newArrayList();
|
||||
list.add(backend);
|
||||
backendMaps.put(backend.getHost(), list);
|
||||
}
|
||||
}
|
||||
candidates.clear();
|
||||
for (List<Backend> list : backendMaps.values()) {
|
||||
candidates.add(list.get(0));
|
||||
}
|
||||
}
|
||||
|
||||
if (candidates.isEmpty()) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size());
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
Collections.sort(candidates, new BeComparator());
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// Select the smallest number of tablets as the starting position of
|
||||
// round robin in the BE that match the policy
|
||||
public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) {
|
||||
public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium, boolean isStorageMediumSpecified) {
|
||||
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder()
|
||||
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag))
|
||||
.needScheduleAvailable()
|
||||
.needCheckDiskUsage()
|
||||
.addTags(Sets.newHashSet(tag))
|
||||
.setStorageMedium(storageMedium);
|
||||
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
|
||||
builder.allowOnSameHost();
|
||||
}
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
List<Backend> candidates = getCandidates(policy);
|
||||
List<Long> beIds = selectBackendIdsByPolicy(policy, -1);
|
||||
if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified) {
|
||||
storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD;
|
||||
policy = builder.setStorageMedium(storageMedium).build();
|
||||
beIds = selectBackendIdsByPolicy(policy, -1);
|
||||
}
|
||||
|
||||
long minBeTabletsNum = Long.MAX_VALUE;
|
||||
int minIndex = -1;
|
||||
for (int i = 0; i < candidates.size(); ++i) {
|
||||
long tabletsNum = Env.getCurrentInvertedIndex()
|
||||
.getTabletIdsByBackendId(candidates.get(i).getId()).size();
|
||||
for (int i = 0; i < beIds.size(); ++i) {
|
||||
long tabletsNum = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beIds.get(i)).size();
|
||||
if (tabletsNum < minBeTabletsNum) {
|
||||
minBeTabletsNum = tabletsNum;
|
||||
minIndex = i;
|
||||
@ -503,40 +460,12 @@ public class SystemInfoService {
|
||||
return minIndex;
|
||||
}
|
||||
|
||||
public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation(
|
||||
ReplicaAllocation replicaAlloc, TStorageMedium storageMedium,
|
||||
Map<Tag, Integer> nextIndexs) throws DdlException {
|
||||
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
|
||||
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
|
||||
short totalReplicaNum = 0;
|
||||
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
|
||||
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder()
|
||||
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
|
||||
.setStorageMedium(storageMedium);
|
||||
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
|
||||
builder.allowOnSameHost();
|
||||
}
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
int nextIndex = nextIndexs.get(entry.getKey());
|
||||
List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex);
|
||||
nextIndexs.put(entry.getKey(), nextIndex + beIds.size());
|
||||
|
||||
if (beIds.isEmpty()) {
|
||||
throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy);
|
||||
}
|
||||
chosenBackendIds.put(entry.getKey(), beIds);
|
||||
totalReplicaNum += beIds.size();
|
||||
}
|
||||
Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum());
|
||||
return chosenBackendIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a set of backends for replica creation.
|
||||
* The following parameters need to be considered when selecting backends.
|
||||
*
|
||||
* @param replicaAlloc
|
||||
* @param nextIndexs create tablet round robin next be index, when enable_round_robin_create_tablet
|
||||
* @param storageMedium
|
||||
* @param isStorageMediumSpecified
|
||||
* @param isOnlyForCheck set true if only used for check available backend
|
||||
@ -544,7 +473,8 @@ public class SystemInfoService {
|
||||
* @throws DdlException
|
||||
*/
|
||||
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
|
||||
ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, boolean isStorageMediumSpecified,
|
||||
ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs,
|
||||
TStorageMedium storageMedium, boolean isStorageMediumSpecified,
|
||||
boolean isOnlyForCheck)
|
||||
throws DdlException {
|
||||
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
|
||||
@ -561,12 +491,17 @@ public class SystemInfoService {
|
||||
List<String> failedEntries = Lists.newArrayList();
|
||||
|
||||
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
|
||||
Tag tag = entry.getKey();
|
||||
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder()
|
||||
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
|
||||
.setStorageMedium(storageMedium);
|
||||
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
|
||||
builder.allowOnSameHost();
|
||||
}
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
builder.setEnableRoundRobin(true);
|
||||
builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1));
|
||||
}
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
|
||||
@ -574,9 +509,16 @@ public class SystemInfoService {
|
||||
// if only for check, no need to retry different storage medium to get backend
|
||||
if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) {
|
||||
storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD;
|
||||
policy = builder.setStorageMedium(storageMedium).build();
|
||||
builder.setStorageMedium(storageMedium);
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1));
|
||||
}
|
||||
policy = builder.build();
|
||||
beIds = selectBackendIdsByPolicy(policy, entry.getValue());
|
||||
}
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
nextIndexs.put(tag, policy.nextRoundRobinIndex);
|
||||
}
|
||||
// after retry different storage medium, it's still empty
|
||||
if (beIds.isEmpty()) {
|
||||
LOG.error("failed backend(s) for policy:" + policy);
|
||||
@ -605,7 +547,7 @@ public class SystemInfoService {
|
||||
/**
|
||||
* Select a set of backends by the given policy.
|
||||
*
|
||||
* @param policy
|
||||
* @param policy if policy is enableRoundRobin, will update its nextRoundRobinIndex
|
||||
* @param number number of backends which need to be selected. -1 means return as many as possible.
|
||||
* @return return #number of backend ids,
|
||||
* or empty set if no backends match the policy, or the number of matched backends is less than "number";
|
||||
@ -613,50 +555,77 @@ public class SystemInfoService {
|
||||
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
|
||||
Preconditions.checkArgument(number >= -1);
|
||||
List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values());
|
||||
if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
|
||||
if (candidates.size() < number || candidates.isEmpty()) {
|
||||
LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
// If only need one Backend, just return a random one.
|
||||
if (number == 1) {
|
||||
if (number == 1 && !policy.enableRoundRobin) {
|
||||
Collections.shuffle(candidates);
|
||||
return Lists.newArrayList(candidates.get(0).getId());
|
||||
}
|
||||
|
||||
if (policy.allowOnSameHost) {
|
||||
Collections.shuffle(candidates);
|
||||
if (number == -1) {
|
||||
return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
|
||||
} else {
|
||||
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
|
||||
boolean hasSameHost = false;
|
||||
if (!policy.allowOnSameHost) {
|
||||
// for each host, random select one backend.
|
||||
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
|
||||
for (Backend backend : candidates) {
|
||||
if (backendMaps.containsKey(backend.getHost())) {
|
||||
backendMaps.get(backend.getHost()).add(backend);
|
||||
} else {
|
||||
List<Backend> list = Lists.newArrayList();
|
||||
list.add(backend);
|
||||
backendMaps.put(backend.getHost(), list);
|
||||
}
|
||||
}
|
||||
|
||||
candidates.clear();
|
||||
for (List<Backend> list : backendMaps.values()) {
|
||||
if (list.size() > 1) {
|
||||
Collections.shuffle(list);
|
||||
hasSameHost = true;
|
||||
}
|
||||
candidates.add(list.get(0));
|
||||
}
|
||||
}
|
||||
|
||||
// for each host, random select one backend.
|
||||
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
|
||||
for (Backend backend : candidates) {
|
||||
if (backendMaps.containsKey(backend.getHost())) {
|
||||
backendMaps.get(backend.getHost()).add(backend);
|
||||
} else {
|
||||
List<Backend> list = Lists.newArrayList();
|
||||
list.add(backend);
|
||||
backendMaps.put(backend.getHost(), list);
|
||||
}
|
||||
}
|
||||
candidates.clear();
|
||||
for (List<Backend> list : backendMaps.values()) {
|
||||
Collections.shuffle(list);
|
||||
candidates.add(list.get(0));
|
||||
}
|
||||
if (number != -1 && candidates.size() < number) {
|
||||
if (candidates.size() < number) {
|
||||
LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
Collections.shuffle(candidates);
|
||||
if (number != -1) {
|
||||
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
|
||||
|
||||
if (policy.enableRoundRobin) {
|
||||
if (!policy.allowOnSameHost && hasSameHost) {
|
||||
// not allow same host and has same host,
|
||||
// then we compare them with their host
|
||||
Collections.sort(candidates, new BeHostComparator());
|
||||
} else {
|
||||
Collections.sort(candidates, new BeIdComparator());
|
||||
}
|
||||
|
||||
if (policy.nextRoundRobinIndex < 0) {
|
||||
policy.nextRoundRobinIndex = new SecureRandom().nextInt(candidates.size());
|
||||
}
|
||||
|
||||
int realIndex = policy.nextRoundRobinIndex % candidates.size();
|
||||
List<Long> partialOrderList = new ArrayList<Long>();
|
||||
partialOrderList.addAll(candidates.subList(realIndex, candidates.size())
|
||||
.stream().map(Backend::getId).collect(Collectors.toList()));
|
||||
partialOrderList.addAll(candidates.subList(0, realIndex)
|
||||
.stream().map(Backend::getId).collect(Collectors.toList()));
|
||||
|
||||
List<Long> result = number == -1 ? partialOrderList : partialOrderList.subList(0, number);
|
||||
policy.nextRoundRobinIndex = realIndex + result.size();
|
||||
|
||||
return result;
|
||||
} else {
|
||||
return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
|
||||
Collections.shuffle(candidates);
|
||||
if (number != -1) {
|
||||
return candidates.subList(0, number).stream().map(Backend::getId).collect(Collectors.toList());
|
||||
} else {
|
||||
return candidates.stream().map(Backend::getId).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -54,6 +55,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.zip.Adler32;
|
||||
@ -153,12 +155,14 @@ public class RestoreJobTest {
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any,
|
||||
false, true);
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
|
||||
Maps.newHashMap(), (TStorageMedium) any, false, true);
|
||||
minTimes = 0;
|
||||
result = new Delegate() {
|
||||
public synchronized List<Long> selectBackendIdsForReplicaCreation(
|
||||
ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) {
|
||||
ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs,
|
||||
TStorageMedium medium, boolean isStorageMediumSpecified,
|
||||
boolean isOnlyForCheck) {
|
||||
List<Long> beIds = Lists.newArrayList();
|
||||
beIds.add(CatalogMocker.BACKEND1_ID);
|
||||
beIds.add(CatalogMocker.BACKEND2_ID);
|
||||
|
||||
@ -52,7 +52,8 @@ public class ReplicaAllocationTest {
|
||||
public void setUp() throws DdlException {
|
||||
new Expectations() {
|
||||
{
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, false, true);
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, Maps.newHashMap(),
|
||||
(TStorageMedium) any, false, true);
|
||||
minTimes = 0;
|
||||
result = new Delegate() {
|
||||
Map<Tag, List<Long>> selectBackendIdsForReplicaCreation() {
|
||||
|
||||
@ -150,7 +150,7 @@ public class CanalSyncDataTest {
|
||||
result = execPlanFragmentParams;
|
||||
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
|
||||
(TStorageMedium) any, false, true);
|
||||
Maps.newHashMap(), (TStorageMedium) any, false, true);
|
||||
minTimes = 0;
|
||||
result = backendIds;
|
||||
|
||||
|
||||
@ -404,7 +404,7 @@ public class SystemInfoServiceTest {
|
||||
Map<Long, Integer> beCounterMap = Maps.newHashMap();
|
||||
for (int i = 0; i < 10000; ++i) {
|
||||
Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
|
||||
TStorageMedium.HDD, false, false);
|
||||
Maps.newHashMap(), TStorageMedium.HDD, false, false);
|
||||
Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
|
||||
for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
|
||||
beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
|
||||
|
||||
Reference in New Issue
Block a user