diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4fcd815072..be58c13985 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2239,7 +2239,28 @@ public class Config extends ConfigBase { "When file cache is enabled, the number of virtual nodes of each node in the consistent hash algorithm. " + "The larger the value, the more uniform the distribution of the hash algorithm, " + "but it will increase the memory overhead."}) - public static int virtual_node_number = 2048; + public static int split_assigner_virtual_node_number = 256; + + @ConfField(mutable = true, description = { + "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。", + "Local node soft affinity optimization. Prefer local replication node."}) + public static boolean split_assigner_optimized_local_scheduling = true; + + @ConfField(mutable = true, description = { + "随机算法最小的候选数目,会选取相对最空闲的节点。", + "The random algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_random_candidate_num = 2; + + @ConfField(mutable = true, description = { + "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。", + "The consistent hash algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_consistent_hash_candidate_num = 2; + + @ConfField(mutable = true, description = { + "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。", + "The maximum difference in the number of splits between nodes. " + + "If this number is exceeded, the splits will be redistributed."}) + public static int split_assigner_max_split_num_variance = 1; @ConfField(description = { "控制统计信息的自动触发作业执行记录的持久化行数", diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java new file mode 100644 index 0000000000..f93db510e7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * A priority queue with constant time contains(E) and log time remove(E) + * Ties are broken by insertion order + */ +public final class IndexedPriorityQueue + implements UpdateablePriorityQueue { + private final Map> index = new HashMap<>(); + private final Set> queue; + private long generation; + + public IndexedPriorityQueue() { + this(PriorityOrdering.HIGH_TO_LOW); + } + + public IndexedPriorityQueue(PriorityOrdering priorityOrdering) { + switch (priorityOrdering) { + case LOW_TO_HIGH: + queue = new TreeSet<>( + Comparator.comparingLong((Entry entry) -> entry.getPriority()) + .thenComparingLong(Entry::getGeneration)); + break; + case HIGH_TO_LOW: + queue = new TreeSet<>((entry1, entry2) -> { + int priorityComparison = Long.compare(entry2.getPriority(), entry1.getPriority()); + if (priorityComparison != 0) { + return priorityComparison; + } + return Long.compare(entry1.getGeneration(), entry2.getGeneration()); + }); + break; + default: + throw new IllegalArgumentException(); + } + } + + @Override + public boolean addOrUpdate(E element, long priority) { + Entry entry = index.get(element); + if (entry != null) { + if (entry.getPriority() == priority) { + return false; + } + queue.remove(entry); + Entry newEntry = new Entry<>(element, priority, entry.getGeneration()); + queue.add(newEntry); + index.put(element, newEntry); + return false; + } + Entry newEntry = new Entry<>(element, priority, generation); + generation++; + queue.add(newEntry); + index.put(element, newEntry); + return true; + } + + @Override + public boolean contains(E element) { + return index.containsKey(element); + } + + @Override + public boolean remove(E element) { + Entry entry = index.remove(element); + if (entry != null) { + queue.remove(entry); + return true; + } + return false; + } + + @Override + public E poll() { + Entry entry = pollEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public E peek() { + Entry entry = peekEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + public Prioritized getPrioritized(E element) { + Entry entry = index.get(element); + if (entry == null) { + return null; + } + + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Prioritized pollPrioritized() { + Entry entry = pollEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + private Entry pollEntry() { + Iterator> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + Entry entry = iterator.next(); + iterator.remove(); + Preconditions.checkState(index.remove(entry.getValue()) != null, "Failed to remove entry from index"); + return entry; + } + + public Prioritized peekPrioritized() { + Entry entry = peekEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Entry peekEntry() { + Iterator> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + return iterator.next(); + } + + @Override + public Iterator iterator() { + return Iterators.transform(queue.iterator(), Entry::getValue); + } + + public enum PriorityOrdering { + LOW_TO_HIGH, + HIGH_TO_LOW + } + + private static final class Entry { + private final E value; + private final long priority; + private final long generation; + + private Entry(E value, long priority, long generation) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + this.generation = generation; + } + + public E getValue() { + return value; + } + + public long getPriority() { + return priority; + } + + public long getGeneration() { + return generation; + } + } + + public static class Prioritized { + private final V value; + private final long priority; + + public Prioritized(V value, long priority) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + } + + public V getValue() { + return value; + } + + public long getPriority() { + return priority; + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java new file mode 100644 index 0000000000..6dc58a3523 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java +// and modified by Doris + +package org.apache.doris.common; + +interface Queue { + boolean contains(E element); + + boolean remove(E element); + + E poll(); + + E peek(); + + int size(); + + boolean isEmpty(); +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java new file mode 100644 index 0000000000..11d1860435 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java +// and modified by Doris + +package org.apache.doris.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +public class ResettableRandomizedIterator + implements Iterator { + private final List list; + private int position; + + public ResettableRandomizedIterator(Collection elements) { + this.list = new ArrayList<>(elements); + } + + public void reset() { + position = 0; + } + + @Override + public boolean hasNext() { + return position < list.size(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + int position = ThreadLocalRandom.current().nextInt(this.position, list.size()); + + T result = list.set(position, list.get(this.position)); + list.set(this.position, result); + this.position++; + + return result; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java new file mode 100644 index 0000000000..c9c0a2b3de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/UpdateablePriorityQueue.java +// and modified by Doris + +package org.apache.doris.common; + +interface UpdateablePriorityQueue + extends Queue, Iterable { + boolean addOrUpdate(E element, long priority); +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java index 4ef6e4168f..85199ad32f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java @@ -17,11 +17,16 @@ package org.apache.doris.common.util; +import com.google.common.collect.ImmutableList; import com.google.common.hash.Funnel; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -89,14 +94,38 @@ public class ConsistentHash { } } - public N getNode(K key) { - if (ring.isEmpty()) { - return null; + public List getNode(K key, int count) { + int nodeCount = ring.values().size(); + if (count > nodeCount) { + count = nodeCount; } + + Set uniqueNodes = new LinkedHashSet<>(); + Hasher hasher = hashFunction.newHasher(); Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong(); + SortedMap tailMap = ring.tailMap(hashKey); - hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); - return ring.get(hashKey).getNode(); + // Start reading from tail + for (Map.Entry entry : tailMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + + if (uniqueNodes.size() < count) { + // Start reading from the head as we have exhausted tail + SortedMap headMap = ring.headMap(hashKey); + for (Map.Entry entry : headMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + } + + return ImmutableList.copyOf(uniqueNodes); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java index a4751b5205..cbfe318881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.UserException; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TScanRangeLocations; @@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode { // set to false means this scan node does not need to check column priv. protected boolean needCheckColumnPriv; - protected final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); + protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableFileCache) + ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode { return scanRangeLocations.size(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index ade03291c3..df0e955ea8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -14,28 +14,38 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is referenced from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +// and modified by Doris package org.apache.doris.planner.external; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.IndexedPriorityQueue; +import org.apache.doris.common.ResettableRandomizedIterator; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.system.SystemInfoService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; @@ -45,12 +55,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -59,35 +73,47 @@ public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); private final List backends = Lists.newArrayList(); private final Map> backendMap = Maps.newHashMap(); - private final SecureRandom random = new SecureRandom(); - private ConsistentHash consistentHash; + + public Map getAssignedWeightPerBackend() { + return assignedWeightPerBackend; + } + + private Map assignedWeightPerBackend = Maps.newHashMap(); + + private ConsistentHash consistentHash; private int nextBe = 0; private boolean initialized = false; + private NodeSelectionStrategy nodeSelectionStrategy; + private boolean enableSplitsRedistribution = true; + // Create a ConsistentHash ring may be a time-consuming operation, so we cache it. - private static LoadingCache> consistentHashCache; + private static LoadingCache> consistentHashCache; static { consistentHashCache = CacheBuilder.newBuilder().maximumSize(5) - .build(new CacheLoader>() { + .build(new CacheLoader>() { @Override - public ConsistentHash load(HashCacheKey key) { - return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), - new BackendHash(), key.bes, Config.virtual_node_number); + public ConsistentHash load(HashCacheKey key) { + return new ConsistentHash<>(Hashing.murmur3_128(), new SplitHash(), + new BackendHash(), key.bes, Config.split_assigner_virtual_node_number); } }); } private static class HashCacheKey { // sorted backend ids as key - private List beIds; + private List beHashKeys; // backends is not part of key, just an attachment private List bes; HashCacheKey(List backends) { this.bes = backends; - this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList()); + this.beHashKeys = backends.stream().map(b -> + String.format("id: %d, host: %s, port: %d", b.getId(), b.getHost(), b.getHeartbeatPort())) + .sorted() + .collect(Collectors.toList()); } @Override @@ -98,20 +124,28 @@ public class FederationBackendPolicy { if (!(obj instanceof HashCacheKey)) { return false; } - return Objects.equals(beIds, ((HashCacheKey) obj).beIds); + return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys); } @Override public int hashCode() { - return Objects.hash(beIds); + return Objects.hash(beHashKeys); } @Override public String toString() { - return "HashCache{" + "beIds=" + beIds + '}'; + return "HashCache{" + "beHashKeys=" + beHashKeys + '}'; } } + public FederationBackendPolicy(NodeSelectionStrategy nodeSelectionStrategy) { + this.nodeSelectionStrategy = nodeSelectionStrategy; + } + + public FederationBackendPolicy() { + this(NodeSelectionStrategy.ROUND_ROBIN); + } + public void init() throws UserException { if (!initialized) { init(Collections.emptyList()); @@ -152,6 +186,10 @@ public class FederationBackendPolicy { if (backends.isEmpty()) { throw new UserException("No available backends"); } + for (Backend backend : backends) { + assignedWeightPerBackend.put(backend, 0L); + } + backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); try { consistentHash = consistentHashCache.get(new HashCacheKey(backends)); @@ -166,23 +204,280 @@ public class FederationBackendPolicy { return selectedBackend; } - public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { - return consistentHash.getNode(scanRangeLocations); + @VisibleForTesting + public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) { + this.enableSplitsRedistribution = enableSplitsRedistribution; } - // Try to find a local BE, if not exists, use `getNextBe` instead - public Backend getNextLocalBe(List hosts, TScanRangeLocations scanRangeLocations) { - List candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); + public Multimap computeScanRangeAssignment(List splits) throws UserException { + // Sorting splits is to ensure that the same query utilizes the os page cache as much as possible. + splits.sort((split1, split2) -> { + int pathComparison = split1.getPathString().compareTo(split2.getPathString()); + if (pathComparison != 0) { + return pathComparison; + } + + int startComparison = Long.compare(split1.getStart(), split2.getStart()); + if (startComparison != 0) { + return startComparison; + } + return Long.compare(split1.getLength(), split2.getLength()); + }); + + ListMultimap assignment = ArrayListMultimap.create(); + + List remainingSplits = null; + + List backends = new ArrayList<>(); + for (List backendList : backendMap.values()) { + backends.addAll(backendList); + } + ResettableRandomizedIterator randomCandidates = new ResettableRandomizedIterator<>(backends); + + boolean splitsToBeRedistributed = false; + + // optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain + // locality information + if (Config.split_assigner_optimized_local_scheduling) { + remainingSplits = new ArrayList<>(splits.size()); + for (int i = 0; i < splits.size(); ++i) { + Split split = splits.get(i); + if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) { + List candidateNodes = selectExactNodes(backendMap, split.getHosts()); + + Optional chosenNode = candidateNodes.stream() + .min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode))); + + if (chosenNode.isPresent()) { + Backend selectedBackend = chosenNode.get(); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + splitsToBeRedistributed = true; + continue; + } + } + remainingSplits.add(split); + } + } else { + remainingSplits = splits; + } + + for (Split split : remainingSplits) { + List candidateNodes; + if (!split.isRemotelyAccessible()) { + candidateNodes = selectExactNodes(backendMap, split.getHosts()); + } else { + switch (nodeSelectionStrategy) { + case ROUND_ROBIN: { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + candidateNodes = ImmutableList.of(selectedBackend); + break; + } + case RANDOM: { + randomCandidates.reset(); + candidateNodes = selectNodes(Config.split_assigner_min_random_candidate_num, randomCandidates); + break; + } + case CONSISTENT_HASHING: { + candidateNodes = consistentHash.getNode(split, + Config.split_assigner_min_consistent_hash_candidate_num); + splitsToBeRedistributed = true; + break; + } + default: { + throw new RuntimeException(); + } + } + } + if (candidateNodes.isEmpty()) { + LOG.debug("No nodes available to schedule {}. Available nodes {}", split, + backends); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); + } + + Backend selectedBackend = chooseNodeForSplit(candidateNodes); + List alternativeBackends = new ArrayList<>(candidateNodes); + alternativeBackends.remove(selectedBackend); + split.setAlternativeHosts( + alternativeBackends.stream().map(each -> each.getHost()).collect(Collectors.toList())); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + } + + if (enableSplitsRedistribution && splitsToBeRedistributed) { + equateDistribution(assignment); + } + return assignment; + } + + /** + * The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and + * a minHeap based on the number of splits that are assigned to them. Splits are redistributed, one at a time, + * from a maxNode to a minNode until we have as uniform a distribution as possible. + * + * @param assignment the node-splits multimap after the first and the second stage + */ + private void equateDistribution(ListMultimap assignment) { + if (assignment.isEmpty()) { + return; + } + + List allNodes = new ArrayList<>(); + for (List backendList : backendMap.values()) { + allNodes.addAll(backendList); + } + Collections.sort(allNodes, Comparator.comparing(Backend::getId)); + + if (allNodes.size() < 2) { + return; + } + + IndexedPriorityQueue maxNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node)); + } + + IndexedPriorityQueue minNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + minNodes.addOrUpdate(node, Long.MAX_VALUE - assignedWeightPerBackend.get(node)); + } + + while (true) { + if (maxNodes.isEmpty()) { + return; + } + + // fetch min and max node + Backend maxNode = maxNodes.poll(); + Backend minNode = minNodes.poll(); + + // Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution + // among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform + // distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes + // with data because of potential savings in network throughput and CPU time. + if (assignedWeightPerBackend.get(maxNode) - assignedWeightPerBackend.get(minNode) + <= SplitWeight.rawValueForStandardSplitCount(Config.split_assigner_max_split_num_variance)) { + return; + } + + // move split from max to min + Split redistributedSplit = redistributeSplit(assignment, maxNode, minNode); + + assignedWeightPerBackend.put(maxNode, + assignedWeightPerBackend.get(maxNode) - redistributedSplit.getSplitWeight().getRawValue()); + assignedWeightPerBackend.put(minNode, Math.addExact( + assignedWeightPerBackend.get(minNode), redistributedSplit.getSplitWeight().getRawValue())); + + // add max back into maxNodes only if it still has assignments + if (assignment.containsKey(maxNode)) { + maxNodes.addOrUpdate(maxNode, assignedWeightPerBackend.get(maxNode)); + } + + // Add or update both the Priority Queues with the updated node priorities + maxNodes.addOrUpdate(minNode, assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignedWeightPerBackend.get(maxNode)); + } + } + + /** + * The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to + * redistribute a Non-local split if possible. This case is possible when there are multiple queries running + * simultaneously. If a Non-local split cannot be found in the maxNode, next split is selected and reassigned. + */ + @VisibleForTesting + public static Split redistributeSplit(Multimap assignment, Backend fromNode, + Backend toNode) { + Iterator splitIterator = assignment.get(fromNode).iterator(); + Split splitToBeRedistributed = null; + while (splitIterator.hasNext()) { + Split split = splitIterator.next(); + // Try to select non-local split for redistribution + if (split.getHosts() != null && !isSplitLocal( + split.getHosts(), fromNode.getHost())) { + splitToBeRedistributed = split; + break; + } + } + // Select split if maxNode has no non-local splits in the current batch of assignment + if (splitToBeRedistributed == null) { + splitIterator = assignment.get(fromNode).iterator(); + while (splitIterator.hasNext()) { + splitToBeRedistributed = splitIterator.next(); + // if toNode has split replication, transfer this split firstly + if (splitToBeRedistributed.getHosts() != null && isSplitLocal( + splitToBeRedistributed.getHosts(), toNode.getHost())) { + break; + } + // if toNode is split alternative host, transfer this split firstly + if (splitToBeRedistributed.getAlternativeHosts() != null && isSplitLocal( + splitToBeRedistributed.getAlternativeHosts(), toNode.getHost())) { + break; + } + } + } + splitIterator.remove(); + assignment.put(toNode, splitToBeRedistributed); + return splitToBeRedistributed; + } + + private static boolean isSplitLocal(String[] splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + private static boolean isSplitLocal(List splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + public static List selectExactNodes(Map> backendMap, String[] hosts) { + Set chosen = new LinkedHashSet<>(); + for (String host : hosts) { - List backends = backendMap.get(host); - if (CollectionUtils.isNotEmpty(backends)) { - candidateBackends.add(backends.get(random.nextInt(backends.size()))); + if (backendMap.containsKey(host)) { + backendMap.get(host).stream() + .forEach(chosen::add); + } + } + return ImmutableList.copyOf(chosen); + } + + public static List selectNodes(int limit, Iterator candidates) { + Preconditions.checkArgument(limit > 0, "limit must be at least 1"); + + List selected = new ArrayList<>(limit); + while (selected.size() < limit && candidates.hasNext()) { + selected.add(candidates.next()); + } + + return selected; + } + + private Backend chooseNodeForSplit(List candidateNodes) { + Backend chosenNode = null; + long minWeight = Long.MAX_VALUE; + + for (Backend node : candidateNodes) { + long queuedWeight = assignedWeightPerBackend.get(node); + if (queuedWeight <= minWeight) { + chosenNode = node; + minWeight = queuedWeight; } } - return CollectionUtils.isEmpty(candidateBackends) - ? getNextConsistentBe(scanRangeLocations) - : candidateBackends.get(random.nextInt(candidateBackends.size())); + return chosenNode; } public int numBackends() { @@ -200,15 +495,13 @@ public class FederationBackendPolicy { } } - private static class ScanRangeHash implements Funnel { + private static class SplitHash implements Funnel { @Override - public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { - Preconditions.checkState(scanRange.scan_range.isSetExtScanRange()); - for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) { - primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8)); - primitiveSink.putLong(desc.start_offset); - primitiveSink.putLong(desc.size); - } + public void funnel(Split split, PrimitiveSink primitiveSink) { + primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putLong(split.getStart()); + primitiveSink.putLong(split.getLength()); } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 4a3c3d2ff9..a374df5e60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -68,13 +68,14 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -314,76 +315,73 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); } - boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List pathPartitionKeys = getPathPartitionKeys(); - for (Split split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - TFileType locationType; - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } - TScanRangeLocations curLocations = newLocations(); - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - boolean isACID = false; - if (fileSplit instanceof HiveSplit) { - HiveSplit hiveSplit = (HiveSplit) split; - isACID = hiveSplit.isACID(); - } - List partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) - : fileSplit.getPartitionValues(); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) split; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); + Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); + for (Backend backend : assignment.keySet()) { + Collection splits = assignment.get(backend); + for (Split split : splits) { + FileSplit fileSplit = (FileSplit) split; + TFileType locationType; + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - setScanParams(rangeDesc, fileSplit); + TScanRangeLocations curLocations = newLocations(); + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + boolean isACID = false; + if (fileSplit instanceof HiveSplit) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + isACID = hiveSplit.isACID(); + } + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, + false, isACID) : fileSplit.getPartitionValues(); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend; - if (enableShortCircuitRead) { - // Try to find a local BE if enable hdfs short circuit read - selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations); - } else { - // Use consistent hash to assign the same scan range into the same backend among different queries - selectedBackend = backendPolicy.getNextConsistentBe(curLocations); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, + locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); + if (isACID) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); + AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); + TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); + transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); + List deleteDeltaDescs = new ArrayList<>(); + for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { + TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); + deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); + deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); + deleteDeltaDescs.add(deleteDeltaDesc); + } + transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); + tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + setScanParams(rangeDesc, fileSplit); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + TScanRangeLocation location = new TScanRangeLocation(); + setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + curLocations.addToLocations(location); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + scanRangeLocations.add(curLocations); + this.totalFileSize += fileSplit.getLength(); } - setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - curLocations.addToLocations(location); - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); - scanRangeLocations.add(curLocations); - this.totalFileSize += fileSplit.getLength(); } + if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime(); } @@ -518,3 +516,4 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract Map getLocationProperties() throws UserException; } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index 4e9d0bae56..1221d3ad21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -42,6 +42,8 @@ public class FileSplit implements Split { // partitionValues would be ["part1", "part2"] protected List partitionValues; + protected List alternativeHosts; + public FileSplit(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts, List partitionValues) { this.path = path; @@ -67,6 +69,11 @@ public class FileSplit implements Split { return null; } + @Override + public String getPathString() { + return path.toString(); + } + public static class FileSplitCreator implements SplitCreator { public static final FileSplitCreator DEFAULT = new FileSplitCreator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java new file mode 100644 index 0000000000..4b4d10a424 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +public enum NodeSelectionStrategy { + ROUND_ROBIN, + RANDOM, + CONSISTENT_HASHING +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java new file mode 100644 index 0000000000..d8724017b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java +// and modified by Doris + +package org.apache.doris.planner.external; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.errorprone.annotations.DoNotCall; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.function.Function; + +public final class SplitWeight { + private static final long UNIT_VALUE = 100; + private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE + private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE); + + private final long value; + + private SplitWeight(long value) { + if (value <= 0) { + throw new IllegalArgumentException("value must be > 0, found: " + value); + } + this.value = value; + } + + /** + * Produces a {@link SplitWeight} from the raw internal value representation. This method is intended + * primarily for JSON deserialization, and connectors should use not call this factory method directly + * to construct {@link SplitWeight} instances. Instead, connectors should use + * {@link SplitWeight#fromProportion(double)} + * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} changes in the future. + */ + @JsonCreator + @DoNotCall // For JSON serialization only + public static SplitWeight fromRawValue(long value) { + return fromRawValueInternal(value); + } + + /** + * Produces a {@link SplitWeight} that corresponds to the {@link SplitWeight#standard()} weight + * proportionally, i.e., a parameter of 1.0 will be equivalent to the standard weight + * and a value of 0.5 will be 1/2 of the standard split weight. Valid arguments + * must be greater than zero and finite. Connectors should prefer constructing split weights + * using this factory method rather than passing a raw integer value in case the integer representation + * of a standard split needs to change in the future. + * + * @param weight the proportional weight relative to a standard split, expressed as a double + * @return a {@link SplitWeight} with a raw value corresponding to the requested proportion + */ + public static SplitWeight fromProportion(double weight) { + if (weight <= 0 || !Double.isFinite(weight)) { + throw new IllegalArgumentException("Invalid weight: " + weight); + } + // Must round up to avoid small weights rounding to 0 + return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE)); + } + + private static SplitWeight fromRawValueInternal(long value) { + return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value); + } + + public static SplitWeight standard() { + return STANDARD_WEIGHT; + } + + public static long rawValueForStandardSplitCount(int splitCount) { + if (splitCount < 0) { + throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount); + } + return Math.multiplyExact(splitCount, UNIT_VALUE); + } + + public static long rawValueSum(Collection collection, Function getter) { + long sum = 0; + for (T item : collection) { + long value = getter.apply(item).getRawValue(); + sum = Math.addExact(sum, value); + } + return sum; + } + + /** + * @return The internal integer representation for this weight value + */ + @JsonValue + public long getRawValue() { + return value; + } + + @Override + public int hashCode() { + return Long.hashCode(value); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SplitWeight)) { + return false; + } + return this.value == ((SplitWeight) other).value; + } + + @Override + public String toString() { + if (value == UNIT_VALUE) { + return "1"; + } + return BigDecimal.valueOf(value, -UNIT_SCALE).stripTrailingZeros().toPlainString(); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java index 31b1e1515a..d42defe5c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java @@ -17,6 +17,10 @@ package org.apache.doris.spi; +import org.apache.doris.planner.external.SplitWeight; + +import java.util.List; + /** * Split interface. e.g. Tablet for Olap Table. */ @@ -26,4 +30,23 @@ public interface Split { Object getInfo(); + default SplitWeight getSplitWeight() { + return SplitWeight.standard(); + } + + default boolean isRemotelyAccessible() { + return true; + } + + String getPathString(); + + long getStart(); + + long getLength(); + + List getAlternativeHosts(); + + void setAlternativeHosts(List alternativeHosts); + } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index fcb5e63e83..47d9619b07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -622,7 +622,7 @@ public class Backend implements Writable { @Override public int hashCode() { - return Objects.hash(id, host, heartbeatPort, bePort, isAlive); + return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get()); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index ef65d1b616..1d213c6a09 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -18,52 +18,55 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.planner.external.FileSplit; +import org.apache.doris.planner.external.NodeSelectionStrategy; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanRange; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimap; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.junit.Before; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; public class FederationBackendPolicyTest { @Mocked private Env env; - @Before - public void setUp() { - + @Test + public void testRemoteSplits() throws UserException { SystemInfoService service = new SystemInfoService(); - for (int i = 0; i < 190; i++) { - Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." + i, 9051); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050); - backend.setAlive(false); - service.addBackend(backend); - } + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); new MockUp() { @Mock @@ -72,76 +75,653 @@ public class FederationBackendPolicyTest { } }; + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + FederationBackendPolicy policy = new FederationBackendPolicy(); + policy.init(); + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } } @Test - public void testGetNextBe() throws UserException { - FederationBackendPolicy policy = new FederationBackendPolicy(); - policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; - Assertions.assertEquals(policy.numBackends(), backendNum); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2.")); - } - sw.stop(); - System.out.println("Invoke getNextBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); - } + public void testHasLocalSplits() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(30002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(30003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(30004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); - @Test - public void testGetNextLocalBe() throws UserException { FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - List localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2"); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost())); + int totalSplitNum = 0; + List checkedLocalSplit = new ArrayList<>(); + Multimap assignment = policy.computeScanRangeAssignment(splits); + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + ++totalSplitNum; + if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.100", backend.getHost()); + checkedLocalSplit.add(true); + } else if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.106", backend.getHost()); + checkedLocalSplit.add(true); + } + } } - sw.stop(); - System.out.println("Invoke getNextLocalBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); + Assert.assertEquals(2, checkedLocalSplit.size()); + Assert.assertEquals(8, totalSplitNum); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } @Test public void testConsistentHash() throws UserException { - FederationBackendPolicy policy = new FederationBackendPolicy(); + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); policy.init(); - int backendNum = 200; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId()); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); - scanRangeLocations = getScanRangeLocations("path2", 0, 100); - Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId()); } - private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size)); - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - return locations; + @Test + public void testGenerateRandomly() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set backendIds = new HashSet<>(); + Set ipAddresses = new HashSet<>(); + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + backend.setAlive(true); + service.addBackend(backend); + } + + List remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + ListMultimap result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), backendNum); + int totalSplitNum = 0; + + List totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result, assignment)); + + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); + + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } } - private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setPath(path); - rangeDesc.setStartOffset(startOffset); - rangeDesc.setSize(size); - return rangeDesc; + @Test + public void testNonAliveNodes() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set backendIds = new HashSet<>(); + Set ipAddresses = new HashSet<>(); + int aliveBackendNum = 0; + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + if (i % 2 == 0) { + ++aliveBackendNum; + backend.setAlive(true); + } else { + backend.setAlive(false); + } + service.addBackend(backend); + } + + List remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + Multimap result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), aliveBackendNum); + int totalSplitNum = 0; + List totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertEquals(result, assignment); + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); + + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } + } + + private static class TestSplitHashKey { + private String path; + private long start; + private long length; + + public TestSplitHashKey(String path, long start, long length) { + this.path = path; + this.start = start; + this.length = length; + } + + public String getPath() { + return path; + } + + public long getStart() { + return start; + } + + public long getLength() { + return length; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSplitHashKey that = (TestSplitHashKey) o; + return start == that.start && length == that.length && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(path, start, length); + } + } + + @Test + public void testConsistentHashWhenNodeChanged() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + Map originSplitAssignedBackends = new HashMap<>(); + { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + // Set these options to ensure that the consistent hash algorithm is consistent. + policy.setEnableSplitsRedistribution(false); + Config.split_assigner_min_consistent_hash_candidate_num = 1; + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + originSplitAssignedBackends.put( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength()), backend); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + } + + // remove a node + { + service.dropBackend(backend3.getId()); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 2; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Remove a node: move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.375, moveRatio); + } + + // add a node + { + Backend backend4 = new Backend(10004L, "172.30.0.128", 9050); + backend4.setAlive(true); + service.addBackend(backend4); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Add a node, move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.25, moveRatio); + } + } + + private static boolean areMultimapsEqualIgnoringOrder( + Multimap multimap1, Multimap multimap2) { + Collection> entries1 = multimap1.entries(); + Collection> entries2 = multimap2.entries(); + + return entries1.containsAll(entries2) && entries2.containsAll(entries1); } } +