From 383850ef12b07b9323276314b26ec6da7e792e86 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Sun, 4 Feb 2024 11:13:29 +0800 Subject: [PATCH] [Opt](multi-catalog) Opt split assignment to resolve uneven distribution. (#30390) [Opt] (multi-catalog) Opt split assignment to resolve uneven distribution. Currently only for `FileQueryScanNode`. Referring to the implementation of Trino, - Local node soft affinity optimization. Prefer local replication node. - Remote split will use the consistent hash algorithm is used when the file cache is turned on, and because of the possible unevenness of the consistent hash, the split is re-adjusted so that the maximum and minimum split numbers of hosts differ by at most `max_split_num_variance` split. - Remote split will use the round-robin algorithm is used when the file cache is turned off. --- .../java/org/apache/doris/common/Config.java | 23 +- .../doris/common/IndexedPriorityQueue.java | 228 ++++++ .../java/org/apache/doris/common/Queue.java | 36 + .../common/ResettableRandomizedIterator.java | 63 ++ .../doris/common/UpdateablePriorityQueue.java | 27 + .../doris/common/util/ConsistentHash.java | 39 +- .../planner/external/ExternalScanNode.java | 6 +- .../external/FederationBackendPolicy.java | 361 ++++++++- .../planner/external/FileQueryScanNode.java | 127 ++- .../doris/planner/external/FileSplit.java | 7 + .../external/NodeSelectionStrategy.java | 25 + .../doris/planner/external/SplitWeight.java | 130 ++++ .../main/java/org/apache/doris/spi/Split.java | 23 + .../java/org/apache/doris/system/Backend.java | 2 +- .../planner/FederationBackendPolicyTest.java | 736 ++++++++++++++++-- 15 files changed, 1649 insertions(+), 184 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/Queue.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4fcd815072..be58c13985 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2239,7 +2239,28 @@ public class Config extends ConfigBase { "When file cache is enabled, the number of virtual nodes of each node in the consistent hash algorithm. " + "The larger the value, the more uniform the distribution of the hash algorithm, " + "but it will increase the memory overhead."}) - public static int virtual_node_number = 2048; + public static int split_assigner_virtual_node_number = 256; + + @ConfField(mutable = true, description = { + "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。", + "Local node soft affinity optimization. Prefer local replication node."}) + public static boolean split_assigner_optimized_local_scheduling = true; + + @ConfField(mutable = true, description = { + "随机算法最小的候选数目,会选取相对最空闲的节点。", + "The random algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_random_candidate_num = 2; + + @ConfField(mutable = true, description = { + "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。", + "The consistent hash algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_consistent_hash_candidate_num = 2; + + @ConfField(mutable = true, description = { + "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。", + "The maximum difference in the number of splits between nodes. " + + "If this number is exceeded, the splits will be redistributed."}) + public static int split_assigner_max_split_num_variance = 1; @ConfField(description = { "控制统计信息的自动触发作业执行记录的持久化行数", diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java new file mode 100644 index 0000000000..f93db510e7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * A priority queue with constant time contains(E) and log time remove(E) + * Ties are broken by insertion order + */ +public final class IndexedPriorityQueue + implements UpdateablePriorityQueue { + private final Map> index = new HashMap<>(); + private final Set> queue; + private long generation; + + public IndexedPriorityQueue() { + this(PriorityOrdering.HIGH_TO_LOW); + } + + public IndexedPriorityQueue(PriorityOrdering priorityOrdering) { + switch (priorityOrdering) { + case LOW_TO_HIGH: + queue = new TreeSet<>( + Comparator.comparingLong((Entry entry) -> entry.getPriority()) + .thenComparingLong(Entry::getGeneration)); + break; + case HIGH_TO_LOW: + queue = new TreeSet<>((entry1, entry2) -> { + int priorityComparison = Long.compare(entry2.getPriority(), entry1.getPriority()); + if (priorityComparison != 0) { + return priorityComparison; + } + return Long.compare(entry1.getGeneration(), entry2.getGeneration()); + }); + break; + default: + throw new IllegalArgumentException(); + } + } + + @Override + public boolean addOrUpdate(E element, long priority) { + Entry entry = index.get(element); + if (entry != null) { + if (entry.getPriority() == priority) { + return false; + } + queue.remove(entry); + Entry newEntry = new Entry<>(element, priority, entry.getGeneration()); + queue.add(newEntry); + index.put(element, newEntry); + return false; + } + Entry newEntry = new Entry<>(element, priority, generation); + generation++; + queue.add(newEntry); + index.put(element, newEntry); + return true; + } + + @Override + public boolean contains(E element) { + return index.containsKey(element); + } + + @Override + public boolean remove(E element) { + Entry entry = index.remove(element); + if (entry != null) { + queue.remove(entry); + return true; + } + return false; + } + + @Override + public E poll() { + Entry entry = pollEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public E peek() { + Entry entry = peekEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + public Prioritized getPrioritized(E element) { + Entry entry = index.get(element); + if (entry == null) { + return null; + } + + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Prioritized pollPrioritized() { + Entry entry = pollEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + private Entry pollEntry() { + Iterator> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + Entry entry = iterator.next(); + iterator.remove(); + Preconditions.checkState(index.remove(entry.getValue()) != null, "Failed to remove entry from index"); + return entry; + } + + public Prioritized peekPrioritized() { + Entry entry = peekEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Entry peekEntry() { + Iterator> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + return iterator.next(); + } + + @Override + public Iterator iterator() { + return Iterators.transform(queue.iterator(), Entry::getValue); + } + + public enum PriorityOrdering { + LOW_TO_HIGH, + HIGH_TO_LOW + } + + private static final class Entry { + private final E value; + private final long priority; + private final long generation; + + private Entry(E value, long priority, long generation) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + this.generation = generation; + } + + public E getValue() { + return value; + } + + public long getPriority() { + return priority; + } + + public long getGeneration() { + return generation; + } + } + + public static class Prioritized { + private final V value; + private final long priority; + + public Prioritized(V value, long priority) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + } + + public V getValue() { + return value; + } + + public long getPriority() { + return priority; + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java new file mode 100644 index 0000000000..6dc58a3523 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java +// and modified by Doris + +package org.apache.doris.common; + +interface Queue { + boolean contains(E element); + + boolean remove(E element); + + E poll(); + + E peek(); + + int size(); + + boolean isEmpty(); +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java new file mode 100644 index 0000000000..11d1860435 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java +// and modified by Doris + +package org.apache.doris.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +public class ResettableRandomizedIterator + implements Iterator { + private final List list; + private int position; + + public ResettableRandomizedIterator(Collection elements) { + this.list = new ArrayList<>(elements); + } + + public void reset() { + position = 0; + } + + @Override + public boolean hasNext() { + return position < list.size(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + int position = ThreadLocalRandom.current().nextInt(this.position, list.size()); + + T result = list.set(position, list.get(this.position)); + list.set(this.position, result); + this.position++; + + return result; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java new file mode 100644 index 0000000000..c9c0a2b3de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/UpdateablePriorityQueue.java +// and modified by Doris + +package org.apache.doris.common; + +interface UpdateablePriorityQueue + extends Queue, Iterable { + boolean addOrUpdate(E element, long priority); +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java index 4ef6e4168f..85199ad32f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java @@ -17,11 +17,16 @@ package org.apache.doris.common.util; +import com.google.common.collect.ImmutableList; import com.google.common.hash.Funnel; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -89,14 +94,38 @@ public class ConsistentHash { } } - public N getNode(K key) { - if (ring.isEmpty()) { - return null; + public List getNode(K key, int count) { + int nodeCount = ring.values().size(); + if (count > nodeCount) { + count = nodeCount; } + + Set uniqueNodes = new LinkedHashSet<>(); + Hasher hasher = hashFunction.newHasher(); Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong(); + SortedMap tailMap = ring.tailMap(hashKey); - hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); - return ring.get(hashKey).getNode(); + // Start reading from tail + for (Map.Entry entry : tailMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + + if (uniqueNodes.size() < count) { + // Start reading from the head as we have exhausted tail + SortedMap headMap = ring.headMap(hashKey); + for (Map.Entry entry : headMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + } + + return ImmutableList.copyOf(uniqueNodes); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java index a4751b5205..cbfe318881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.UserException; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TScanRangeLocations; @@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode { // set to false means this scan node does not need to check column priv. protected boolean needCheckColumnPriv; - protected final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); + protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableFileCache) + ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode { return scanRangeLocations.size(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index ade03291c3..df0e955ea8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -14,28 +14,38 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is referenced from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +// and modified by Doris package org.apache.doris.planner.external; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.IndexedPriorityQueue; +import org.apache.doris.common.ResettableRandomizedIterator; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.system.SystemInfoService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; @@ -45,12 +55,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -59,35 +73,47 @@ public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); private final List backends = Lists.newArrayList(); private final Map> backendMap = Maps.newHashMap(); - private final SecureRandom random = new SecureRandom(); - private ConsistentHash consistentHash; + + public Map getAssignedWeightPerBackend() { + return assignedWeightPerBackend; + } + + private Map assignedWeightPerBackend = Maps.newHashMap(); + + private ConsistentHash consistentHash; private int nextBe = 0; private boolean initialized = false; + private NodeSelectionStrategy nodeSelectionStrategy; + private boolean enableSplitsRedistribution = true; + // Create a ConsistentHash ring may be a time-consuming operation, so we cache it. - private static LoadingCache> consistentHashCache; + private static LoadingCache> consistentHashCache; static { consistentHashCache = CacheBuilder.newBuilder().maximumSize(5) - .build(new CacheLoader>() { + .build(new CacheLoader>() { @Override - public ConsistentHash load(HashCacheKey key) { - return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), - new BackendHash(), key.bes, Config.virtual_node_number); + public ConsistentHash load(HashCacheKey key) { + return new ConsistentHash<>(Hashing.murmur3_128(), new SplitHash(), + new BackendHash(), key.bes, Config.split_assigner_virtual_node_number); } }); } private static class HashCacheKey { // sorted backend ids as key - private List beIds; + private List beHashKeys; // backends is not part of key, just an attachment private List bes; HashCacheKey(List backends) { this.bes = backends; - this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList()); + this.beHashKeys = backends.stream().map(b -> + String.format("id: %d, host: %s, port: %d", b.getId(), b.getHost(), b.getHeartbeatPort())) + .sorted() + .collect(Collectors.toList()); } @Override @@ -98,20 +124,28 @@ public class FederationBackendPolicy { if (!(obj instanceof HashCacheKey)) { return false; } - return Objects.equals(beIds, ((HashCacheKey) obj).beIds); + return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys); } @Override public int hashCode() { - return Objects.hash(beIds); + return Objects.hash(beHashKeys); } @Override public String toString() { - return "HashCache{" + "beIds=" + beIds + '}'; + return "HashCache{" + "beHashKeys=" + beHashKeys + '}'; } } + public FederationBackendPolicy(NodeSelectionStrategy nodeSelectionStrategy) { + this.nodeSelectionStrategy = nodeSelectionStrategy; + } + + public FederationBackendPolicy() { + this(NodeSelectionStrategy.ROUND_ROBIN); + } + public void init() throws UserException { if (!initialized) { init(Collections.emptyList()); @@ -152,6 +186,10 @@ public class FederationBackendPolicy { if (backends.isEmpty()) { throw new UserException("No available backends"); } + for (Backend backend : backends) { + assignedWeightPerBackend.put(backend, 0L); + } + backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); try { consistentHash = consistentHashCache.get(new HashCacheKey(backends)); @@ -166,23 +204,280 @@ public class FederationBackendPolicy { return selectedBackend; } - public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { - return consistentHash.getNode(scanRangeLocations); + @VisibleForTesting + public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) { + this.enableSplitsRedistribution = enableSplitsRedistribution; } - // Try to find a local BE, if not exists, use `getNextBe` instead - public Backend getNextLocalBe(List hosts, TScanRangeLocations scanRangeLocations) { - List candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); + public Multimap computeScanRangeAssignment(List splits) throws UserException { + // Sorting splits is to ensure that the same query utilizes the os page cache as much as possible. + splits.sort((split1, split2) -> { + int pathComparison = split1.getPathString().compareTo(split2.getPathString()); + if (pathComparison != 0) { + return pathComparison; + } + + int startComparison = Long.compare(split1.getStart(), split2.getStart()); + if (startComparison != 0) { + return startComparison; + } + return Long.compare(split1.getLength(), split2.getLength()); + }); + + ListMultimap assignment = ArrayListMultimap.create(); + + List remainingSplits = null; + + List backends = new ArrayList<>(); + for (List backendList : backendMap.values()) { + backends.addAll(backendList); + } + ResettableRandomizedIterator randomCandidates = new ResettableRandomizedIterator<>(backends); + + boolean splitsToBeRedistributed = false; + + // optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain + // locality information + if (Config.split_assigner_optimized_local_scheduling) { + remainingSplits = new ArrayList<>(splits.size()); + for (int i = 0; i < splits.size(); ++i) { + Split split = splits.get(i); + if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) { + List candidateNodes = selectExactNodes(backendMap, split.getHosts()); + + Optional chosenNode = candidateNodes.stream() + .min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode))); + + if (chosenNode.isPresent()) { + Backend selectedBackend = chosenNode.get(); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + splitsToBeRedistributed = true; + continue; + } + } + remainingSplits.add(split); + } + } else { + remainingSplits = splits; + } + + for (Split split : remainingSplits) { + List candidateNodes; + if (!split.isRemotelyAccessible()) { + candidateNodes = selectExactNodes(backendMap, split.getHosts()); + } else { + switch (nodeSelectionStrategy) { + case ROUND_ROBIN: { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + candidateNodes = ImmutableList.of(selectedBackend); + break; + } + case RANDOM: { + randomCandidates.reset(); + candidateNodes = selectNodes(Config.split_assigner_min_random_candidate_num, randomCandidates); + break; + } + case CONSISTENT_HASHING: { + candidateNodes = consistentHash.getNode(split, + Config.split_assigner_min_consistent_hash_candidate_num); + splitsToBeRedistributed = true; + break; + } + default: { + throw new RuntimeException(); + } + } + } + if (candidateNodes.isEmpty()) { + LOG.debug("No nodes available to schedule {}. Available nodes {}", split, + backends); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); + } + + Backend selectedBackend = chooseNodeForSplit(candidateNodes); + List alternativeBackends = new ArrayList<>(candidateNodes); + alternativeBackends.remove(selectedBackend); + split.setAlternativeHosts( + alternativeBackends.stream().map(each -> each.getHost()).collect(Collectors.toList())); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + } + + if (enableSplitsRedistribution && splitsToBeRedistributed) { + equateDistribution(assignment); + } + return assignment; + } + + /** + * The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and + * a minHeap based on the number of splits that are assigned to them. Splits are redistributed, one at a time, + * from a maxNode to a minNode until we have as uniform a distribution as possible. + * + * @param assignment the node-splits multimap after the first and the second stage + */ + private void equateDistribution(ListMultimap assignment) { + if (assignment.isEmpty()) { + return; + } + + List allNodes = new ArrayList<>(); + for (List backendList : backendMap.values()) { + allNodes.addAll(backendList); + } + Collections.sort(allNodes, Comparator.comparing(Backend::getId)); + + if (allNodes.size() < 2) { + return; + } + + IndexedPriorityQueue maxNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node)); + } + + IndexedPriorityQueue minNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + minNodes.addOrUpdate(node, Long.MAX_VALUE - assignedWeightPerBackend.get(node)); + } + + while (true) { + if (maxNodes.isEmpty()) { + return; + } + + // fetch min and max node + Backend maxNode = maxNodes.poll(); + Backend minNode = minNodes.poll(); + + // Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution + // among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform + // distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes + // with data because of potential savings in network throughput and CPU time. + if (assignedWeightPerBackend.get(maxNode) - assignedWeightPerBackend.get(minNode) + <= SplitWeight.rawValueForStandardSplitCount(Config.split_assigner_max_split_num_variance)) { + return; + } + + // move split from max to min + Split redistributedSplit = redistributeSplit(assignment, maxNode, minNode); + + assignedWeightPerBackend.put(maxNode, + assignedWeightPerBackend.get(maxNode) - redistributedSplit.getSplitWeight().getRawValue()); + assignedWeightPerBackend.put(minNode, Math.addExact( + assignedWeightPerBackend.get(minNode), redistributedSplit.getSplitWeight().getRawValue())); + + // add max back into maxNodes only if it still has assignments + if (assignment.containsKey(maxNode)) { + maxNodes.addOrUpdate(maxNode, assignedWeightPerBackend.get(maxNode)); + } + + // Add or update both the Priority Queues with the updated node priorities + maxNodes.addOrUpdate(minNode, assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignedWeightPerBackend.get(maxNode)); + } + } + + /** + * The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to + * redistribute a Non-local split if possible. This case is possible when there are multiple queries running + * simultaneously. If a Non-local split cannot be found in the maxNode, next split is selected and reassigned. + */ + @VisibleForTesting + public static Split redistributeSplit(Multimap assignment, Backend fromNode, + Backend toNode) { + Iterator splitIterator = assignment.get(fromNode).iterator(); + Split splitToBeRedistributed = null; + while (splitIterator.hasNext()) { + Split split = splitIterator.next(); + // Try to select non-local split for redistribution + if (split.getHosts() != null && !isSplitLocal( + split.getHosts(), fromNode.getHost())) { + splitToBeRedistributed = split; + break; + } + } + // Select split if maxNode has no non-local splits in the current batch of assignment + if (splitToBeRedistributed == null) { + splitIterator = assignment.get(fromNode).iterator(); + while (splitIterator.hasNext()) { + splitToBeRedistributed = splitIterator.next(); + // if toNode has split replication, transfer this split firstly + if (splitToBeRedistributed.getHosts() != null && isSplitLocal( + splitToBeRedistributed.getHosts(), toNode.getHost())) { + break; + } + // if toNode is split alternative host, transfer this split firstly + if (splitToBeRedistributed.getAlternativeHosts() != null && isSplitLocal( + splitToBeRedistributed.getAlternativeHosts(), toNode.getHost())) { + break; + } + } + } + splitIterator.remove(); + assignment.put(toNode, splitToBeRedistributed); + return splitToBeRedistributed; + } + + private static boolean isSplitLocal(String[] splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + private static boolean isSplitLocal(List splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + public static List selectExactNodes(Map> backendMap, String[] hosts) { + Set chosen = new LinkedHashSet<>(); + for (String host : hosts) { - List backends = backendMap.get(host); - if (CollectionUtils.isNotEmpty(backends)) { - candidateBackends.add(backends.get(random.nextInt(backends.size()))); + if (backendMap.containsKey(host)) { + backendMap.get(host).stream() + .forEach(chosen::add); + } + } + return ImmutableList.copyOf(chosen); + } + + public static List selectNodes(int limit, Iterator candidates) { + Preconditions.checkArgument(limit > 0, "limit must be at least 1"); + + List selected = new ArrayList<>(limit); + while (selected.size() < limit && candidates.hasNext()) { + selected.add(candidates.next()); + } + + return selected; + } + + private Backend chooseNodeForSplit(List candidateNodes) { + Backend chosenNode = null; + long minWeight = Long.MAX_VALUE; + + for (Backend node : candidateNodes) { + long queuedWeight = assignedWeightPerBackend.get(node); + if (queuedWeight <= minWeight) { + chosenNode = node; + minWeight = queuedWeight; } } - return CollectionUtils.isEmpty(candidateBackends) - ? getNextConsistentBe(scanRangeLocations) - : candidateBackends.get(random.nextInt(candidateBackends.size())); + return chosenNode; } public int numBackends() { @@ -200,15 +495,13 @@ public class FederationBackendPolicy { } } - private static class ScanRangeHash implements Funnel { + private static class SplitHash implements Funnel { @Override - public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { - Preconditions.checkState(scanRange.scan_range.isSetExtScanRange()); - for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) { - primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8)); - primitiveSink.putLong(desc.start_offset); - primitiveSink.putLong(desc.size); - } + public void funnel(Split split, PrimitiveSink primitiveSink) { + primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putLong(split.getStart()); + primitiveSink.putLong(split.getLength()); } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 4a3c3d2ff9..a374df5e60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -68,13 +68,14 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -314,76 +315,73 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); } - boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List pathPartitionKeys = getPathPartitionKeys(); - for (Split split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - TFileType locationType; - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } - TScanRangeLocations curLocations = newLocations(); - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - boolean isACID = false; - if (fileSplit instanceof HiveSplit) { - HiveSplit hiveSplit = (HiveSplit) split; - isACID = hiveSplit.isACID(); - } - List partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) - : fileSplit.getPartitionValues(); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) split; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); + Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); + for (Backend backend : assignment.keySet()) { + Collection splits = assignment.get(backend); + for (Split split : splits) { + FileSplit fileSplit = (FileSplit) split; + TFileType locationType; + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - setScanParams(rangeDesc, fileSplit); + TScanRangeLocations curLocations = newLocations(); + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + boolean isACID = false; + if (fileSplit instanceof HiveSplit) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + isACID = hiveSplit.isACID(); + } + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, + false, isACID) : fileSplit.getPartitionValues(); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend; - if (enableShortCircuitRead) { - // Try to find a local BE if enable hdfs short circuit read - selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations); - } else { - // Use consistent hash to assign the same scan range into the same backend among different queries - selectedBackend = backendPolicy.getNextConsistentBe(curLocations); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, + locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); + if (isACID) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); + AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); + TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); + transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); + List deleteDeltaDescs = new ArrayList<>(); + for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { + TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); + deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); + deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); + deleteDeltaDescs.add(deleteDeltaDesc); + } + transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); + tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + setScanParams(rangeDesc, fileSplit); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + TScanRangeLocation location = new TScanRangeLocation(); + setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + curLocations.addToLocations(location); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + scanRangeLocations.add(curLocations); + this.totalFileSize += fileSplit.getLength(); } - setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - curLocations.addToLocations(location); - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); - scanRangeLocations.add(curLocations); - this.totalFileSize += fileSplit.getLength(); } + if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime(); } @@ -518,3 +516,4 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract Map getLocationProperties() throws UserException; } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index 4e9d0bae56..1221d3ad21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -42,6 +42,8 @@ public class FileSplit implements Split { // partitionValues would be ["part1", "part2"] protected List partitionValues; + protected List alternativeHosts; + public FileSplit(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts, List partitionValues) { this.path = path; @@ -67,6 +69,11 @@ public class FileSplit implements Split { return null; } + @Override + public String getPathString() { + return path.toString(); + } + public static class FileSplitCreator implements SplitCreator { public static final FileSplitCreator DEFAULT = new FileSplitCreator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java new file mode 100644 index 0000000000..4b4d10a424 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +public enum NodeSelectionStrategy { + ROUND_ROBIN, + RANDOM, + CONSISTENT_HASHING +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java new file mode 100644 index 0000000000..d8724017b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java +// and modified by Doris + +package org.apache.doris.planner.external; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.errorprone.annotations.DoNotCall; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.function.Function; + +public final class SplitWeight { + private static final long UNIT_VALUE = 100; + private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE + private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE); + + private final long value; + + private SplitWeight(long value) { + if (value <= 0) { + throw new IllegalArgumentException("value must be > 0, found: " + value); + } + this.value = value; + } + + /** + * Produces a {@link SplitWeight} from the raw internal value representation. This method is intended + * primarily for JSON deserialization, and connectors should use not call this factory method directly + * to construct {@link SplitWeight} instances. Instead, connectors should use + * {@link SplitWeight#fromProportion(double)} + * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} changes in the future. + */ + @JsonCreator + @DoNotCall // For JSON serialization only + public static SplitWeight fromRawValue(long value) { + return fromRawValueInternal(value); + } + + /** + * Produces a {@link SplitWeight} that corresponds to the {@link SplitWeight#standard()} weight + * proportionally, i.e., a parameter of 1.0 will be equivalent to the standard weight + * and a value of 0.5 will be 1/2 of the standard split weight. Valid arguments + * must be greater than zero and finite. Connectors should prefer constructing split weights + * using this factory method rather than passing a raw integer value in case the integer representation + * of a standard split needs to change in the future. + * + * @param weight the proportional weight relative to a standard split, expressed as a double + * @return a {@link SplitWeight} with a raw value corresponding to the requested proportion + */ + public static SplitWeight fromProportion(double weight) { + if (weight <= 0 || !Double.isFinite(weight)) { + throw new IllegalArgumentException("Invalid weight: " + weight); + } + // Must round up to avoid small weights rounding to 0 + return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE)); + } + + private static SplitWeight fromRawValueInternal(long value) { + return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value); + } + + public static SplitWeight standard() { + return STANDARD_WEIGHT; + } + + public static long rawValueForStandardSplitCount(int splitCount) { + if (splitCount < 0) { + throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount); + } + return Math.multiplyExact(splitCount, UNIT_VALUE); + } + + public static long rawValueSum(Collection collection, Function getter) { + long sum = 0; + for (T item : collection) { + long value = getter.apply(item).getRawValue(); + sum = Math.addExact(sum, value); + } + return sum; + } + + /** + * @return The internal integer representation for this weight value + */ + @JsonValue + public long getRawValue() { + return value; + } + + @Override + public int hashCode() { + return Long.hashCode(value); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SplitWeight)) { + return false; + } + return this.value == ((SplitWeight) other).value; + } + + @Override + public String toString() { + if (value == UNIT_VALUE) { + return "1"; + } + return BigDecimal.valueOf(value, -UNIT_SCALE).stripTrailingZeros().toPlainString(); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java index 31b1e1515a..d42defe5c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java @@ -17,6 +17,10 @@ package org.apache.doris.spi; +import org.apache.doris.planner.external.SplitWeight; + +import java.util.List; + /** * Split interface. e.g. Tablet for Olap Table. */ @@ -26,4 +30,23 @@ public interface Split { Object getInfo(); + default SplitWeight getSplitWeight() { + return SplitWeight.standard(); + } + + default boolean isRemotelyAccessible() { + return true; + } + + String getPathString(); + + long getStart(); + + long getLength(); + + List getAlternativeHosts(); + + void setAlternativeHosts(List alternativeHosts); + } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index fcb5e63e83..47d9619b07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -622,7 +622,7 @@ public class Backend implements Writable { @Override public int hashCode() { - return Objects.hash(id, host, heartbeatPort, bePort, isAlive); + return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get()); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index ef65d1b616..1d213c6a09 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -18,52 +18,55 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.planner.external.FileSplit; +import org.apache.doris.planner.external.NodeSelectionStrategy; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanRange; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimap; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.junit.Before; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; public class FederationBackendPolicyTest { @Mocked private Env env; - @Before - public void setUp() { - + @Test + public void testRemoteSplits() throws UserException { SystemInfoService service = new SystemInfoService(); - for (int i = 0; i < 190; i++) { - Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." + i, 9051); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050); - backend.setAlive(false); - service.addBackend(backend); - } + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); new MockUp() { @Mock @@ -72,76 +75,653 @@ public class FederationBackendPolicyTest { } }; + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + FederationBackendPolicy policy = new FederationBackendPolicy(); + policy.init(); + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } } @Test - public void testGetNextBe() throws UserException { - FederationBackendPolicy policy = new FederationBackendPolicy(); - policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; - Assertions.assertEquals(policy.numBackends(), backendNum); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2.")); - } - sw.stop(); - System.out.println("Invoke getNextBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); - } + public void testHasLocalSplits() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(30002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(30003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(30004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); - @Test - public void testGetNextLocalBe() throws UserException { FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - List localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2"); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost())); + int totalSplitNum = 0; + List checkedLocalSplit = new ArrayList<>(); + Multimap assignment = policy.computeScanRangeAssignment(splits); + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + ++totalSplitNum; + if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.100", backend.getHost()); + checkedLocalSplit.add(true); + } else if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.106", backend.getHost()); + checkedLocalSplit.add(true); + } + } } - sw.stop(); - System.out.println("Invoke getNextLocalBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); + Assert.assertEquals(2, checkedLocalSplit.size()); + Assert.assertEquals(8, totalSplitNum); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } @Test public void testConsistentHash() throws UserException { - FederationBackendPolicy policy = new FederationBackendPolicy(); + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); policy.init(); - int backendNum = 200; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId()); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); - scanRangeLocations = getScanRangeLocations("path2", 0, 100); - Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId()); } - private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size)); - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - return locations; + @Test + public void testGenerateRandomly() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set backendIds = new HashSet<>(); + Set ipAddresses = new HashSet<>(); + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + backend.setAlive(true); + service.addBackend(backend); + } + + List remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + ListMultimap result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), backendNum); + int totalSplitNum = 0; + + List totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result, assignment)); + + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); + + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } } - private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setPath(path); - rangeDesc.setStartOffset(startOffset); - rangeDesc.setSize(size); - return rangeDesc; + @Test + public void testNonAliveNodes() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set backendIds = new HashSet<>(); + Set ipAddresses = new HashSet<>(); + int aliveBackendNum = 0; + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + if (i % 2 == 0) { + ++aliveBackendNum; + backend.setAlive(true); + } else { + backend.setAlive(false); + } + service.addBackend(backend); + } + + List remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + Multimap result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), aliveBackendNum); + int totalSplitNum = 0; + List totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertEquals(result, assignment); + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); + + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } + } + + private static class TestSplitHashKey { + private String path; + private long start; + private long length; + + public TestSplitHashKey(String path, long start, long length) { + this.path = path; + this.start = start; + this.length = length; + } + + public String getPath() { + return path; + } + + public long getStart() { + return start; + } + + public long getLength() { + return length; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSplitHashKey that = (TestSplitHashKey) o; + return start == that.start && length == that.length && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(path, start, length); + } + } + + @Test + public void testConsistentHashWhenNodeChanged() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + Map originSplitAssignedBackends = new HashMap<>(); + { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + // Set these options to ensure that the consistent hash algorithm is consistent. + policy.setEnableSplitsRedistribution(false); + Config.split_assigner_min_consistent_hash_candidate_num = 1; + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + originSplitAssignedBackends.put( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength()), backend); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + } + + // remove a node + { + service.dropBackend(backend3.getId()); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 2; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Remove a node: move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.375, moveRatio); + } + + // add a node + { + Backend backend4 = new Backend(10004L, "172.30.0.128", 9050); + backend4.setAlive(true); + service.addBackend(backend4); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Add a node, move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.25, moveRatio); + } + } + + private static boolean areMultimapsEqualIgnoringOrder( + Multimap multimap1, Multimap multimap2) { + Collection> entries1 = multimap1.entries(); + Collection> entries2 = multimap2.entries(); + + return entries1.containsAll(entries2) && entries2.containsAll(entries1); } } +