[improvement](filecache) Use consistent hash to assign the same scan range into the same backend among different queries (#16574)

When file cache enabled, running the same query for the second time may be still slow, for `FE` will assign the same 
scan range into different backends among different queries, and the former cached data in `BE` will be useless if the scan range is changed.

So, this PR introduce consistent hash to assign the same scan range into the same backend among different queries.
This commit is contained in:
Ashin Gau
2023-02-10 19:49:33 +08:00
committed by GitHub
parent 1cc735f20b
commit 3929e8214d
2 changed files with 175 additions and 0 deletions

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import com.google.common.hash.Funnel;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Consistent hash algorithm implemented by SortedMap
*/
public class ConsistentHash<K, N> {
/**
* Virtual node for consistent hash algorithm
*/
private class VirtualNode {
private final int replicaIndex;
private final N node;
public VirtualNode(N node, int replicaIndex) {
this.replicaIndex = replicaIndex;
this.node = node;
}
public N getNode() {
return node;
}
public long hashValue() {
Hasher hasher = hashFunction.newHasher();
hasher.putObject(node, nodeFunnel);
hasher.putInt(replicaIndex);
long hash = hasher.hash().asLong();
return hash;
}
}
HashFunction hashFunction;
Funnel<K> keyFunnel;
Funnel<N> nodeFunnel;
private final SortedMap<Long, VirtualNode> ring = new TreeMap<>();
private final int virtualNumber;
public ConsistentHash(
HashFunction hashFunction,
Funnel<K> keyFunnel,
Funnel<N> nodeFunnel,
Collection<N> nodes,
int virtualNumber) {
this.hashFunction = hashFunction;
this.keyFunnel = keyFunnel;
this.nodeFunnel = nodeFunnel;
this.virtualNumber = virtualNumber;
for (N node : nodes) {
addNode(node);
}
}
public void addNode(N node) {
for (int i = 0; i < virtualNumber; i++) {
VirtualNode vNode = new VirtualNode(node, i);
ring.put(vNode.hashValue(), vNode);
}
}
public void removeNode(N node) {
for (int i = 0; i < virtualNumber; i++) {
VirtualNode vNode = new VirtualNode(node, i);
ring.remove(vNode.hashValue());
}
}
public N getNode(K key) {
if (ring.isEmpty()) {
return null;
}
Hasher hasher = hashFunction.newHasher();
Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
return ring.get(hashKey).getNode();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.telemetry.ScopedSpan;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.ConsistentHash;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.ProfileWriter;
@ -59,6 +60,7 @@ import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.ExternalScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
@ -77,6 +79,7 @@ import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanFragmentDestination;
@ -104,6 +107,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
@ -115,10 +121,12 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -134,6 +142,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class Coordinator {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@ -247,6 +256,26 @@ public class Coordinator {
private boolean isPointQuery = false;
private PointQueryExec pointExec = null;
private static class BackendHash implements Funnel<Backend> {
@Override
public void funnel(Backend backend, PrimitiveSink primitiveSink) {
primitiveSink.putBytes(backend.getHost().getBytes(StandardCharsets.UTF_8));
primitiveSink.putInt(backend.getBePort());
}
}
private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
@Override
public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) {
Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
primitiveSink.putLong(desc.start_offset);
primitiveSink.putLong(desc.size);
}
}
}
// Used for query/insert
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.isBlockQuery = planner.isBlockQuery();
@ -1768,12 +1797,56 @@ public class Coordinator {
return location;
}
private void computeScanRangeAssignmentByConsistentHash(
ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment,
Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
Collection<Backend> aliveBEs = idToBackend.values().stream().filter(SimpleScheduler::isAvailable)
.collect(Collectors.toList());
if (aliveBEs.isEmpty()) {
throw new UserException("No available backends");
}
int virtualNumber = Math.max(Math.min(512 / aliveBEs.size(), 32), 2);
ConsistentHash<TScanRangeLocations, Backend> consistentHash = new ConsistentHash<>(
Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), aliveBEs, virtualNumber);
for (TScanRangeLocations scanRangeLocations : locations) {
TScanRangeLocation minLocation = scanRangeLocations.locations.get(0);
Backend backend = consistentHash.getNode(scanRangeLocations);
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backend.getId());
// Why only increase 1 in other implementations ?
if (scanRangeLocations.getScanRange().isSetExtScanRange()) {
for (TFileRangeDesc desc : scanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges) {
assignedBytesPerHost.compute(execHostPort, (k, v) -> (v == null) ? desc.size : desc.size + v);
}
}
// Is replicaNumPerHost useful ?
replicaNumPerHost.computeIfPresent(minLocation.server, (k, v) -> v - 1);
Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort, new HashMap<>());
List<TScanRangeParams> scanRangeParamsList =
findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>());
TScanRangeParams scanRangeParams = new TScanRangeParams();
scanRangeParams.scan_range = scanRangeLocations.scan_range;
scanRangeParams.setVolumeId(minLocation.volume_id);
scanRangeParamsList.add(scanRangeParams);
}
}
private void computeScanRangeAssignmentByScheduler(
final ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment,
Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (scanNode instanceof ExternalScanNode) {
// Use consistent hash to assign the same scan range into the same backend among different queries
computeScanRangeAssignmentByConsistentHash(
scanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost);
return;
}
for (TScanRangeLocations scanRangeLocations : locations) {
Reference<Long> backendIdRef = new Reference<Long>();
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,