[Opt](multi-catalog) Opt split assignment to resolve uneven distribution. (#30390)
[Opt] (multi-catalog) Opt split assignment to resolve uneven distribution. Currently only for `FileQueryScanNode`. Referring to the implementation of Trino, - Local node soft affinity optimization. Prefer local replication node. - Remote split will use the consistent hash algorithm is used when the file cache is turned on, and because of the possible unevenness of the consistent hash, the split is re-adjusted so that the maximum and minimum split numbers of hosts differ by at most `max_split_num_variance` split. - Remote split will use the round-robin algorithm is used when the file cache is turned off.
This commit is contained in:
@ -0,0 +1,228 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* A priority queue with constant time contains(E) and log time remove(E)
|
||||
* Ties are broken by insertion order
|
||||
*/
|
||||
public final class IndexedPriorityQueue<E>
|
||||
implements UpdateablePriorityQueue<E> {
|
||||
private final Map<E, Entry<E>> index = new HashMap<>();
|
||||
private final Set<Entry<E>> queue;
|
||||
private long generation;
|
||||
|
||||
public IndexedPriorityQueue() {
|
||||
this(PriorityOrdering.HIGH_TO_LOW);
|
||||
}
|
||||
|
||||
public IndexedPriorityQueue(PriorityOrdering priorityOrdering) {
|
||||
switch (priorityOrdering) {
|
||||
case LOW_TO_HIGH:
|
||||
queue = new TreeSet<>(
|
||||
Comparator.comparingLong((Entry<E> entry) -> entry.getPriority())
|
||||
.thenComparingLong(Entry::getGeneration));
|
||||
break;
|
||||
case HIGH_TO_LOW:
|
||||
queue = new TreeSet<>((entry1, entry2) -> {
|
||||
int priorityComparison = Long.compare(entry2.getPriority(), entry1.getPriority());
|
||||
if (priorityComparison != 0) {
|
||||
return priorityComparison;
|
||||
}
|
||||
return Long.compare(entry1.getGeneration(), entry2.getGeneration());
|
||||
});
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addOrUpdate(E element, long priority) {
|
||||
Entry<E> entry = index.get(element);
|
||||
if (entry != null) {
|
||||
if (entry.getPriority() == priority) {
|
||||
return false;
|
||||
}
|
||||
queue.remove(entry);
|
||||
Entry<E> newEntry = new Entry<>(element, priority, entry.getGeneration());
|
||||
queue.add(newEntry);
|
||||
index.put(element, newEntry);
|
||||
return false;
|
||||
}
|
||||
Entry<E> newEntry = new Entry<>(element, priority, generation);
|
||||
generation++;
|
||||
queue.add(newEntry);
|
||||
index.put(element, newEntry);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(E element) {
|
||||
return index.containsKey(element);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(E element) {
|
||||
Entry<E> entry = index.remove(element);
|
||||
if (entry != null) {
|
||||
queue.remove(entry);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public E poll() {
|
||||
Entry<E> entry = pollEntry();
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
return entry.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E peek() {
|
||||
Entry<E> entry = peekEntry();
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
return entry.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return queue.isEmpty();
|
||||
}
|
||||
|
||||
public Prioritized<E> getPrioritized(E element) {
|
||||
Entry<E> entry = index.get(element);
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Prioritized<>(entry.getValue(), entry.getPriority());
|
||||
}
|
||||
|
||||
public Prioritized<E> pollPrioritized() {
|
||||
Entry<E> entry = pollEntry();
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
return new Prioritized<>(entry.getValue(), entry.getPriority());
|
||||
}
|
||||
|
||||
private Entry<E> pollEntry() {
|
||||
Iterator<Entry<E>> iterator = queue.iterator();
|
||||
if (!iterator.hasNext()) {
|
||||
return null;
|
||||
}
|
||||
Entry<E> entry = iterator.next();
|
||||
iterator.remove();
|
||||
Preconditions.checkState(index.remove(entry.getValue()) != null, "Failed to remove entry from index");
|
||||
return entry;
|
||||
}
|
||||
|
||||
public Prioritized<E> peekPrioritized() {
|
||||
Entry<E> entry = peekEntry();
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
return new Prioritized<>(entry.getValue(), entry.getPriority());
|
||||
}
|
||||
|
||||
public Entry<E> peekEntry() {
|
||||
Iterator<Entry<E>> iterator = queue.iterator();
|
||||
if (!iterator.hasNext()) {
|
||||
return null;
|
||||
}
|
||||
return iterator.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return Iterators.transform(queue.iterator(), Entry::getValue);
|
||||
}
|
||||
|
||||
public enum PriorityOrdering {
|
||||
LOW_TO_HIGH,
|
||||
HIGH_TO_LOW
|
||||
}
|
||||
|
||||
private static final class Entry<E> {
|
||||
private final E value;
|
||||
private final long priority;
|
||||
private final long generation;
|
||||
|
||||
private Entry(E value, long priority, long generation) {
|
||||
this.value = Objects.requireNonNull(value, "value is null");
|
||||
this.priority = priority;
|
||||
this.generation = generation;
|
||||
}
|
||||
|
||||
public E getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public long getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public long getGeneration() {
|
||||
return generation;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Prioritized<V> {
|
||||
private final V value;
|
||||
private final long priority;
|
||||
|
||||
public Prioritized(V value, long priority) {
|
||||
this.value = Objects.requireNonNull(value, "value is null");
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public V getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public long getPriority() {
|
||||
return priority;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
36
fe/fe-core/src/main/java/org/apache/doris/common/Queue.java
Normal file
36
fe/fe-core/src/main/java/org/apache/doris/common/Queue.java
Normal file
@ -0,0 +1,36 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
interface Queue<E> {
|
||||
boolean contains(E element);
|
||||
|
||||
boolean remove(E element);
|
||||
|
||||
E poll();
|
||||
|
||||
E peek();
|
||||
|
||||
int size();
|
||||
|
||||
boolean isEmpty();
|
||||
}
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class ResettableRandomizedIterator<T>
|
||||
implements Iterator<T> {
|
||||
private final List<T> list;
|
||||
private int position;
|
||||
|
||||
public ResettableRandomizedIterator(Collection<T> elements) {
|
||||
this.list = new ArrayList<>(elements);
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
position = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return position < list.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
int position = ThreadLocalRandom.current().nextInt(this.position, list.size());
|
||||
|
||||
T result = list.set(position, list.get(this.position));
|
||||
list.set(this.position, result);
|
||||
this.position++;
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,27 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/UpdateablePriorityQueue.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
interface UpdateablePriorityQueue<E>
|
||||
extends Queue<E>, Iterable<E> {
|
||||
boolean addOrUpdate(E element, long priority);
|
||||
}
|
||||
|
||||
@ -17,11 +17,16 @@
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.hash.Funnel;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hasher;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
@ -89,14 +94,38 @@ public class ConsistentHash<K, N> {
|
||||
}
|
||||
}
|
||||
|
||||
public N getNode(K key) {
|
||||
if (ring.isEmpty()) {
|
||||
return null;
|
||||
public List<N> getNode(K key, int count) {
|
||||
int nodeCount = ring.values().size();
|
||||
if (count > nodeCount) {
|
||||
count = nodeCount;
|
||||
}
|
||||
|
||||
Set<N> uniqueNodes = new LinkedHashSet<>();
|
||||
|
||||
Hasher hasher = hashFunction.newHasher();
|
||||
Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
|
||||
|
||||
SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
|
||||
hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
|
||||
return ring.get(hashKey).getNode();
|
||||
// Start reading from tail
|
||||
for (Map.Entry<Long, VirtualNode> entry : tailMap.entrySet()) {
|
||||
uniqueNodes.add(entry.getValue().node);
|
||||
if (uniqueNodes.size() == count) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (uniqueNodes.size() < count) {
|
||||
// Start reading from the head as we have exhausted tail
|
||||
SortedMap<Long, VirtualNode> headMap = ring.headMap(hashKey);
|
||||
for (Map.Entry<Long, VirtualNode> entry : headMap.entrySet()) {
|
||||
uniqueNodes.add(entry.getValue().node);
|
||||
if (uniqueNodes.size() == count) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ImmutableList.copyOf(uniqueNodes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode {
|
||||
// set to false means this scan node does not need to check column priv.
|
||||
protected boolean needCheckColumnPriv;
|
||||
|
||||
protected final FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
|
||||
protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null
|
||||
&& ConnectContext.get().getSessionVariable().enableFileCache)
|
||||
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy();
|
||||
|
||||
public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
|
||||
boolean needCheckColumnPriv) {
|
||||
@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode {
|
||||
return scanRangeLocations.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -14,28 +14,38 @@
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is referenced from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.IndexedPriorityQueue;
|
||||
import org.apache.doris.common.ResettableRandomizedIterator;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.ConsistentHash;
|
||||
import org.apache.doris.mysql.privilege.UserProperty;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.BeSelectionPolicy;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.Funnel;
|
||||
import com.google.common.hash.Hashing;
|
||||
@ -45,12 +55,16 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
@ -59,35 +73,47 @@ public class FederationBackendPolicy {
|
||||
private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class);
|
||||
private final List<Backend> backends = Lists.newArrayList();
|
||||
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
|
||||
private final SecureRandom random = new SecureRandom();
|
||||
private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
|
||||
|
||||
public Map<Backend, Long> getAssignedWeightPerBackend() {
|
||||
return assignedWeightPerBackend;
|
||||
}
|
||||
|
||||
private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
|
||||
|
||||
private ConsistentHash<Split, Backend> consistentHash;
|
||||
|
||||
private int nextBe = 0;
|
||||
private boolean initialized = false;
|
||||
|
||||
private NodeSelectionStrategy nodeSelectionStrategy;
|
||||
private boolean enableSplitsRedistribution = true;
|
||||
|
||||
// Create a ConsistentHash ring may be a time-consuming operation, so we cache it.
|
||||
private static LoadingCache<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
|
||||
private static LoadingCache<HashCacheKey, ConsistentHash<Split, Backend>> consistentHashCache;
|
||||
|
||||
static {
|
||||
consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
|
||||
.build(new CacheLoader<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>>() {
|
||||
.build(new CacheLoader<HashCacheKey, ConsistentHash<Split, Backend>>() {
|
||||
@Override
|
||||
public ConsistentHash<TScanRangeLocations, Backend> load(HashCacheKey key) {
|
||||
return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
|
||||
new BackendHash(), key.bes, Config.virtual_node_number);
|
||||
public ConsistentHash<Split, Backend> load(HashCacheKey key) {
|
||||
return new ConsistentHash<>(Hashing.murmur3_128(), new SplitHash(),
|
||||
new BackendHash(), key.bes, Config.split_assigner_virtual_node_number);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static class HashCacheKey {
|
||||
// sorted backend ids as key
|
||||
private List<Long> beIds;
|
||||
private List<String> beHashKeys;
|
||||
// backends is not part of key, just an attachment
|
||||
private List<Backend> bes;
|
||||
|
||||
HashCacheKey(List<Backend> backends) {
|
||||
this.bes = backends;
|
||||
this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList());
|
||||
this.beHashKeys = backends.stream().map(b ->
|
||||
String.format("id: %d, host: %s, port: %d", b.getId(), b.getHost(), b.getHeartbeatPort()))
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -98,20 +124,28 @@ public class FederationBackendPolicy {
|
||||
if (!(obj instanceof HashCacheKey)) {
|
||||
return false;
|
||||
}
|
||||
return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
|
||||
return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(beIds);
|
||||
return Objects.hash(beHashKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HashCache{" + "beIds=" + beIds + '}';
|
||||
return "HashCache{" + "beHashKeys=" + beHashKeys + '}';
|
||||
}
|
||||
}
|
||||
|
||||
public FederationBackendPolicy(NodeSelectionStrategy nodeSelectionStrategy) {
|
||||
this.nodeSelectionStrategy = nodeSelectionStrategy;
|
||||
}
|
||||
|
||||
public FederationBackendPolicy() {
|
||||
this(NodeSelectionStrategy.ROUND_ROBIN);
|
||||
}
|
||||
|
||||
public void init() throws UserException {
|
||||
if (!initialized) {
|
||||
init(Collections.emptyList());
|
||||
@ -152,6 +186,10 @@ public class FederationBackendPolicy {
|
||||
if (backends.isEmpty()) {
|
||||
throw new UserException("No available backends");
|
||||
}
|
||||
for (Backend backend : backends) {
|
||||
assignedWeightPerBackend.put(backend, 0L);
|
||||
}
|
||||
|
||||
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
|
||||
try {
|
||||
consistentHash = consistentHashCache.get(new HashCacheKey(backends));
|
||||
@ -166,23 +204,280 @@ public class FederationBackendPolicy {
|
||||
return selectedBackend;
|
||||
}
|
||||
|
||||
public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) {
|
||||
return consistentHash.getNode(scanRangeLocations);
|
||||
@VisibleForTesting
|
||||
public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) {
|
||||
this.enableSplitsRedistribution = enableSplitsRedistribution;
|
||||
}
|
||||
|
||||
// Try to find a local BE, if not exists, use `getNextBe` instead
|
||||
public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations scanRangeLocations) {
|
||||
List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size());
|
||||
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException {
|
||||
// Sorting splits is to ensure that the same query utilizes the os page cache as much as possible.
|
||||
splits.sort((split1, split2) -> {
|
||||
int pathComparison = split1.getPathString().compareTo(split2.getPathString());
|
||||
if (pathComparison != 0) {
|
||||
return pathComparison;
|
||||
}
|
||||
|
||||
int startComparison = Long.compare(split1.getStart(), split2.getStart());
|
||||
if (startComparison != 0) {
|
||||
return startComparison;
|
||||
}
|
||||
return Long.compare(split1.getLength(), split2.getLength());
|
||||
});
|
||||
|
||||
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
|
||||
|
||||
List<Split> remainingSplits = null;
|
||||
|
||||
List<Backend> backends = new ArrayList<>();
|
||||
for (List<Backend> backendList : backendMap.values()) {
|
||||
backends.addAll(backendList);
|
||||
}
|
||||
ResettableRandomizedIterator<Backend> randomCandidates = new ResettableRandomizedIterator<>(backends);
|
||||
|
||||
boolean splitsToBeRedistributed = false;
|
||||
|
||||
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain
|
||||
// locality information
|
||||
if (Config.split_assigner_optimized_local_scheduling) {
|
||||
remainingSplits = new ArrayList<>(splits.size());
|
||||
for (int i = 0; i < splits.size(); ++i) {
|
||||
Split split = splits.get(i);
|
||||
if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) {
|
||||
List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts());
|
||||
|
||||
Optional<Backend> chosenNode = candidateNodes.stream()
|
||||
.min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode)));
|
||||
|
||||
if (chosenNode.isPresent()) {
|
||||
Backend selectedBackend = chosenNode.get();
|
||||
assignment.put(selectedBackend, split);
|
||||
assignedWeightPerBackend.put(selectedBackend,
|
||||
assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue());
|
||||
splitsToBeRedistributed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
remainingSplits.add(split);
|
||||
}
|
||||
} else {
|
||||
remainingSplits = splits;
|
||||
}
|
||||
|
||||
for (Split split : remainingSplits) {
|
||||
List<Backend> candidateNodes;
|
||||
if (!split.isRemotelyAccessible()) {
|
||||
candidateNodes = selectExactNodes(backendMap, split.getHosts());
|
||||
} else {
|
||||
switch (nodeSelectionStrategy) {
|
||||
case ROUND_ROBIN: {
|
||||
Backend selectedBackend = backends.get(nextBe++);
|
||||
nextBe = nextBe % backends.size();
|
||||
candidateNodes = ImmutableList.of(selectedBackend);
|
||||
break;
|
||||
}
|
||||
case RANDOM: {
|
||||
randomCandidates.reset();
|
||||
candidateNodes = selectNodes(Config.split_assigner_min_random_candidate_num, randomCandidates);
|
||||
break;
|
||||
}
|
||||
case CONSISTENT_HASHING: {
|
||||
candidateNodes = consistentHash.getNode(split,
|
||||
Config.split_assigner_min_consistent_hash_candidate_num);
|
||||
splitsToBeRedistributed = true;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (candidateNodes.isEmpty()) {
|
||||
LOG.debug("No nodes available to schedule {}. Available nodes {}", split,
|
||||
backends);
|
||||
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
|
||||
}
|
||||
|
||||
Backend selectedBackend = chooseNodeForSplit(candidateNodes);
|
||||
List<Backend> alternativeBackends = new ArrayList<>(candidateNodes);
|
||||
alternativeBackends.remove(selectedBackend);
|
||||
split.setAlternativeHosts(
|
||||
alternativeBackends.stream().map(each -> each.getHost()).collect(Collectors.toList()));
|
||||
assignment.put(selectedBackend, split);
|
||||
assignedWeightPerBackend.put(selectedBackend,
|
||||
assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue());
|
||||
}
|
||||
|
||||
if (enableSplitsRedistribution && splitsToBeRedistributed) {
|
||||
equateDistribution(assignment);
|
||||
}
|
||||
return assignment;
|
||||
}
|
||||
|
||||
/**
|
||||
* The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and
|
||||
* a minHeap based on the number of splits that are assigned to them. Splits are redistributed, one at a time,
|
||||
* from a maxNode to a minNode until we have as uniform a distribution as possible.
|
||||
*
|
||||
* @param assignment the node-splits multimap after the first and the second stage
|
||||
*/
|
||||
private void equateDistribution(ListMultimap<Backend, Split> assignment) {
|
||||
if (assignment.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Backend> allNodes = new ArrayList<>();
|
||||
for (List<Backend> backendList : backendMap.values()) {
|
||||
allNodes.addAll(backendList);
|
||||
}
|
||||
Collections.sort(allNodes, Comparator.comparing(Backend::getId));
|
||||
|
||||
if (allNodes.size() < 2) {
|
||||
return;
|
||||
}
|
||||
|
||||
IndexedPriorityQueue<Backend> maxNodes = new IndexedPriorityQueue<>();
|
||||
for (Backend node : allNodes) {
|
||||
maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node));
|
||||
}
|
||||
|
||||
IndexedPriorityQueue<Backend> minNodes = new IndexedPriorityQueue<>();
|
||||
for (Backend node : allNodes) {
|
||||
minNodes.addOrUpdate(node, Long.MAX_VALUE - assignedWeightPerBackend.get(node));
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (maxNodes.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// fetch min and max node
|
||||
Backend maxNode = maxNodes.poll();
|
||||
Backend minNode = minNodes.poll();
|
||||
|
||||
// Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution
|
||||
// among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform
|
||||
// distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes
|
||||
// with data because of potential savings in network throughput and CPU time.
|
||||
if (assignedWeightPerBackend.get(maxNode) - assignedWeightPerBackend.get(minNode)
|
||||
<= SplitWeight.rawValueForStandardSplitCount(Config.split_assigner_max_split_num_variance)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// move split from max to min
|
||||
Split redistributedSplit = redistributeSplit(assignment, maxNode, minNode);
|
||||
|
||||
assignedWeightPerBackend.put(maxNode,
|
||||
assignedWeightPerBackend.get(maxNode) - redistributedSplit.getSplitWeight().getRawValue());
|
||||
assignedWeightPerBackend.put(minNode, Math.addExact(
|
||||
assignedWeightPerBackend.get(minNode), redistributedSplit.getSplitWeight().getRawValue()));
|
||||
|
||||
// add max back into maxNodes only if it still has assignments
|
||||
if (assignment.containsKey(maxNode)) {
|
||||
maxNodes.addOrUpdate(maxNode, assignedWeightPerBackend.get(maxNode));
|
||||
}
|
||||
|
||||
// Add or update both the Priority Queues with the updated node priorities
|
||||
maxNodes.addOrUpdate(minNode, assignedWeightPerBackend.get(minNode));
|
||||
minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignedWeightPerBackend.get(minNode));
|
||||
minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignedWeightPerBackend.get(maxNode));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to
|
||||
* redistribute a Non-local split if possible. This case is possible when there are multiple queries running
|
||||
* simultaneously. If a Non-local split cannot be found in the maxNode, next split is selected and reassigned.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static Split redistributeSplit(Multimap<Backend, Split> assignment, Backend fromNode,
|
||||
Backend toNode) {
|
||||
Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
|
||||
Split splitToBeRedistributed = null;
|
||||
while (splitIterator.hasNext()) {
|
||||
Split split = splitIterator.next();
|
||||
// Try to select non-local split for redistribution
|
||||
if (split.getHosts() != null && !isSplitLocal(
|
||||
split.getHosts(), fromNode.getHost())) {
|
||||
splitToBeRedistributed = split;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Select split if maxNode has no non-local splits in the current batch of assignment
|
||||
if (splitToBeRedistributed == null) {
|
||||
splitIterator = assignment.get(fromNode).iterator();
|
||||
while (splitIterator.hasNext()) {
|
||||
splitToBeRedistributed = splitIterator.next();
|
||||
// if toNode has split replication, transfer this split firstly
|
||||
if (splitToBeRedistributed.getHosts() != null && isSplitLocal(
|
||||
splitToBeRedistributed.getHosts(), toNode.getHost())) {
|
||||
break;
|
||||
}
|
||||
// if toNode is split alternative host, transfer this split firstly
|
||||
if (splitToBeRedistributed.getAlternativeHosts() != null && isSplitLocal(
|
||||
splitToBeRedistributed.getAlternativeHosts(), toNode.getHost())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
splitIterator.remove();
|
||||
assignment.put(toNode, splitToBeRedistributed);
|
||||
return splitToBeRedistributed;
|
||||
}
|
||||
|
||||
private static boolean isSplitLocal(String[] splitHosts, String host) {
|
||||
for (String splitHost : splitHosts) {
|
||||
if (splitHost.equals(host)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean isSplitLocal(List<String> splitHosts, String host) {
|
||||
for (String splitHost : splitHosts) {
|
||||
if (splitHost.equals(host)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static List<Backend> selectExactNodes(Map<String, List<Backend>> backendMap, String[] hosts) {
|
||||
Set<Backend> chosen = new LinkedHashSet<>();
|
||||
|
||||
for (String host : hosts) {
|
||||
List<Backend> backends = backendMap.get(host);
|
||||
if (CollectionUtils.isNotEmpty(backends)) {
|
||||
candidateBackends.add(backends.get(random.nextInt(backends.size())));
|
||||
if (backendMap.containsKey(host)) {
|
||||
backendMap.get(host).stream()
|
||||
.forEach(chosen::add);
|
||||
}
|
||||
}
|
||||
return ImmutableList.copyOf(chosen);
|
||||
}
|
||||
|
||||
public static List<Backend> selectNodes(int limit, Iterator<Backend> candidates) {
|
||||
Preconditions.checkArgument(limit > 0, "limit must be at least 1");
|
||||
|
||||
List<Backend> selected = new ArrayList<>(limit);
|
||||
while (selected.size() < limit && candidates.hasNext()) {
|
||||
selected.add(candidates.next());
|
||||
}
|
||||
|
||||
return selected;
|
||||
}
|
||||
|
||||
private Backend chooseNodeForSplit(List<Backend> candidateNodes) {
|
||||
Backend chosenNode = null;
|
||||
long minWeight = Long.MAX_VALUE;
|
||||
|
||||
for (Backend node : candidateNodes) {
|
||||
long queuedWeight = assignedWeightPerBackend.get(node);
|
||||
if (queuedWeight <= minWeight) {
|
||||
chosenNode = node;
|
||||
minWeight = queuedWeight;
|
||||
}
|
||||
}
|
||||
|
||||
return CollectionUtils.isEmpty(candidateBackends)
|
||||
? getNextConsistentBe(scanRangeLocations)
|
||||
: candidateBackends.get(random.nextInt(candidateBackends.size()));
|
||||
return chosenNode;
|
||||
}
|
||||
|
||||
public int numBackends() {
|
||||
@ -200,15 +495,13 @@ public class FederationBackendPolicy {
|
||||
}
|
||||
}
|
||||
|
||||
private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
|
||||
private static class SplitHash implements Funnel<Split> {
|
||||
@Override
|
||||
public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) {
|
||||
Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
|
||||
for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
|
||||
primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
|
||||
primitiveSink.putLong(desc.start_offset);
|
||||
primitiveSink.putLong(desc.size);
|
||||
}
|
||||
public void funnel(Split split, PrimitiveSink primitiveSink) {
|
||||
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
|
||||
primitiveSink.putLong(split.getStart());
|
||||
primitiveSink.putLong(split.getLength());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -68,13 +68,14 @@ import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import lombok.Getter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -314,76 +315,73 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties);
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
for (Split split : inputSplits) {
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
TFileType locationType;
|
||||
if (fileSplit instanceof IcebergSplit
|
||||
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
|
||||
locationType = TFileType.FILE_BROKER;
|
||||
} else {
|
||||
locationType = getLocationType(fileSplit.getPath().toString());
|
||||
}
|
||||
|
||||
TScanRangeLocations curLocations = newLocations();
|
||||
// If fileSplit has partition values, use the values collected from hive partitions.
|
||||
// Otherwise, use the values in file path.
|
||||
boolean isACID = false;
|
||||
if (fileSplit instanceof HiveSplit) {
|
||||
HiveSplit hiveSplit = (HiveSplit) split;
|
||||
isACID = hiveSplit.isACID();
|
||||
}
|
||||
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
|
||||
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID)
|
||||
: fileSplit.getPartitionValues();
|
||||
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
|
||||
locationType);
|
||||
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
|
||||
rangeDesc.setCompressType(fileCompressType);
|
||||
if (isACID) {
|
||||
HiveSplit hiveSplit = (HiveSplit) split;
|
||||
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
|
||||
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
|
||||
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
|
||||
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
|
||||
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
|
||||
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
|
||||
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
|
||||
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
|
||||
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
|
||||
deleteDeltaDescs.add(deleteDeltaDesc);
|
||||
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
|
||||
for (Backend backend : assignment.keySet()) {
|
||||
Collection<Split> splits = assignment.get(backend);
|
||||
for (Split split : splits) {
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
TFileType locationType;
|
||||
if (fileSplit instanceof IcebergSplit
|
||||
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
|
||||
locationType = TFileType.FILE_BROKER;
|
||||
} else {
|
||||
locationType = getLocationType(fileSplit.getPath().toString());
|
||||
}
|
||||
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
|
||||
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
}
|
||||
|
||||
setScanParams(rangeDesc, fileSplit);
|
||||
TScanRangeLocations curLocations = newLocations();
|
||||
// If fileSplit has partition values, use the values collected from hive partitions.
|
||||
// Otherwise, use the values in file path.
|
||||
boolean isACID = false;
|
||||
if (fileSplit instanceof HiveSplit) {
|
||||
HiveSplit hiveSplit = (HiveSplit) fileSplit;
|
||||
isACID = hiveSplit.isACID();
|
||||
}
|
||||
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
|
||||
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
|
||||
false, isACID) : fileSplit.getPartitionValues();
|
||||
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
TScanRangeLocation location = new TScanRangeLocation();
|
||||
Backend selectedBackend;
|
||||
if (enableShortCircuitRead) {
|
||||
// Try to find a local BE if enable hdfs short circuit read
|
||||
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
|
||||
} else {
|
||||
// Use consistent hash to assign the same scan range into the same backend among different queries
|
||||
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
|
||||
locationType);
|
||||
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
|
||||
rangeDesc.setCompressType(fileCompressType);
|
||||
if (isACID) {
|
||||
HiveSplit hiveSplit = (HiveSplit) fileSplit;
|
||||
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
|
||||
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
|
||||
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
|
||||
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
|
||||
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
|
||||
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
|
||||
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
|
||||
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
|
||||
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
|
||||
deleteDeltaDescs.add(deleteDeltaDesc);
|
||||
}
|
||||
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
|
||||
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
}
|
||||
|
||||
setScanParams(rangeDesc, fileSplit);
|
||||
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
TScanRangeLocation location = new TScanRangeLocation();
|
||||
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
|
||||
location.setBackendId(backend.getId());
|
||||
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
|
||||
curLocations.addToLocations(location);
|
||||
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
|
||||
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
|
||||
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
|
||||
scanRangeLocations.add(curLocations);
|
||||
this.totalFileSize += fileSplit.getLength();
|
||||
}
|
||||
setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties);
|
||||
location.setBackendId(selectedBackend.getId());
|
||||
location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
|
||||
curLocations.addToLocations(location);
|
||||
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
|
||||
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
|
||||
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
|
||||
scanRangeLocations.add(curLocations);
|
||||
this.totalFileSize += fileSplit.getLength();
|
||||
}
|
||||
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
|
||||
}
|
||||
@ -518,3 +516,4 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
|
||||
protected abstract Map<String, String> getLocationProperties() throws UserException;
|
||||
}
|
||||
|
||||
|
||||
@ -42,6 +42,8 @@ public class FileSplit implements Split {
|
||||
// partitionValues would be ["part1", "part2"]
|
||||
protected List<String> partitionValues;
|
||||
|
||||
protected List<String> alternativeHosts;
|
||||
|
||||
public FileSplit(Path path, long start, long length, long fileLength,
|
||||
long modificationTime, String[] hosts, List<String> partitionValues) {
|
||||
this.path = path;
|
||||
@ -67,6 +69,11 @@ public class FileSplit implements Split {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPathString() {
|
||||
return path.toString();
|
||||
}
|
||||
|
||||
public static class FileSplitCreator implements SplitCreator {
|
||||
|
||||
public static final FileSplitCreator DEFAULT = new FileSplitCreator();
|
||||
|
||||
25
fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
vendored
Normal file
25
fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
vendored
Normal file
@ -0,0 +1,25 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
public enum NodeSelectionStrategy {
|
||||
ROUND_ROBIN,
|
||||
RANDOM,
|
||||
CONSISTENT_HASHING
|
||||
}
|
||||
|
||||
130
fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
vendored
Normal file
130
fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
vendored
Normal file
@ -0,0 +1,130 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.google.errorprone.annotations.DoNotCall;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Collection;
|
||||
import java.util.function.Function;
|
||||
|
||||
public final class SplitWeight {
|
||||
private static final long UNIT_VALUE = 100;
|
||||
private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE
|
||||
private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE);
|
||||
|
||||
private final long value;
|
||||
|
||||
private SplitWeight(long value) {
|
||||
if (value <= 0) {
|
||||
throw new IllegalArgumentException("value must be > 0, found: " + value);
|
||||
}
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces a {@link SplitWeight} from the raw internal value representation. This method is intended
|
||||
* primarily for JSON deserialization, and connectors should use not call this factory method directly
|
||||
* to construct {@link SplitWeight} instances. Instead, connectors should use
|
||||
* {@link SplitWeight#fromProportion(double)}
|
||||
* to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} changes in the future.
|
||||
*/
|
||||
@JsonCreator
|
||||
@DoNotCall // For JSON serialization only
|
||||
public static SplitWeight fromRawValue(long value) {
|
||||
return fromRawValueInternal(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces a {@link SplitWeight} that corresponds to the {@link SplitWeight#standard()} weight
|
||||
* proportionally, i.e., a parameter of <code>1.0</code> will be equivalent to the standard weight
|
||||
* and a value of <code>0.5</code> will be 1/2 of the standard split weight. Valid arguments
|
||||
* must be greater than zero and finite. Connectors should prefer constructing split weights
|
||||
* using this factory method rather than passing a raw integer value in case the integer representation
|
||||
* of a standard split needs to change in the future.
|
||||
*
|
||||
* @param weight the proportional weight relative to a standard split, expressed as a double
|
||||
* @return a {@link SplitWeight} with a raw value corresponding to the requested proportion
|
||||
*/
|
||||
public static SplitWeight fromProportion(double weight) {
|
||||
if (weight <= 0 || !Double.isFinite(weight)) {
|
||||
throw new IllegalArgumentException("Invalid weight: " + weight);
|
||||
}
|
||||
// Must round up to avoid small weights rounding to 0
|
||||
return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE));
|
||||
}
|
||||
|
||||
private static SplitWeight fromRawValueInternal(long value) {
|
||||
return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value);
|
||||
}
|
||||
|
||||
public static SplitWeight standard() {
|
||||
return STANDARD_WEIGHT;
|
||||
}
|
||||
|
||||
public static long rawValueForStandardSplitCount(int splitCount) {
|
||||
if (splitCount < 0) {
|
||||
throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount);
|
||||
}
|
||||
return Math.multiplyExact(splitCount, UNIT_VALUE);
|
||||
}
|
||||
|
||||
public static <T> long rawValueSum(Collection<T> collection, Function<T, SplitWeight> getter) {
|
||||
long sum = 0;
|
||||
for (T item : collection) {
|
||||
long value = getter.apply(item).getRawValue();
|
||||
sum = Math.addExact(sum, value);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The internal integer representation for this weight value
|
||||
*/
|
||||
@JsonValue
|
||||
public long getRawValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Long.hashCode(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof SplitWeight)) {
|
||||
return false;
|
||||
}
|
||||
return this.value == ((SplitWeight) other).value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (value == UNIT_VALUE) {
|
||||
return "1";
|
||||
}
|
||||
return BigDecimal.valueOf(value, -UNIT_SCALE).stripTrailingZeros().toPlainString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,10 @@
|
||||
|
||||
package org.apache.doris.spi;
|
||||
|
||||
import org.apache.doris.planner.external.SplitWeight;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Split interface. e.g. Tablet for Olap Table.
|
||||
*/
|
||||
@ -26,4 +30,23 @@ public interface Split {
|
||||
|
||||
Object getInfo();
|
||||
|
||||
default SplitWeight getSplitWeight() {
|
||||
return SplitWeight.standard();
|
||||
}
|
||||
|
||||
default boolean isRemotelyAccessible() {
|
||||
return true;
|
||||
}
|
||||
|
||||
String getPathString();
|
||||
|
||||
long getStart();
|
||||
|
||||
long getLength();
|
||||
|
||||
List<String> getAlternativeHosts();
|
||||
|
||||
void setAlternativeHosts(List<String> alternativeHosts);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -622,7 +622,7 @@ public class Backend implements Writable {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, host, heartbeatPort, bePort, isAlive);
|
||||
return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user