From 3929e8214d777dbc8591ea2a7cb44c059efae9fa Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Fri, 10 Feb 2023 19:49:33 +0800 Subject: [PATCH] [improvement](filecache) Use consistent hash to assign the same scan range into the same backend among different queries (#16574) When file cache enabled, running the same query for the second time may be still slow, for `FE` will assign the same scan range into different backends among different queries, and the former cached data in `BE` will be useless if the scan range is changed. So, this PR introduce consistent hash to assign the same scan range into the same backend among different queries. --- .../doris/common/util/ConsistentHash.java | 102 ++++++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 73 +++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java new file mode 100644 index 0000000000..4ef6e4168f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import com.google.common.hash.Funnel; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; + +import java.util.Collection; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Consistent hash algorithm implemented by SortedMap + */ +public class ConsistentHash { + /** + * Virtual node for consistent hash algorithm + */ + private class VirtualNode { + private final int replicaIndex; + private final N node; + + public VirtualNode(N node, int replicaIndex) { + this.replicaIndex = replicaIndex; + this.node = node; + } + + public N getNode() { + return node; + } + + public long hashValue() { + Hasher hasher = hashFunction.newHasher(); + hasher.putObject(node, nodeFunnel); + hasher.putInt(replicaIndex); + long hash = hasher.hash().asLong(); + return hash; + } + } + + HashFunction hashFunction; + Funnel keyFunnel; + Funnel nodeFunnel; + private final SortedMap ring = new TreeMap<>(); + private final int virtualNumber; + + public ConsistentHash( + HashFunction hashFunction, + Funnel keyFunnel, + Funnel nodeFunnel, + Collection nodes, + int virtualNumber) { + this.hashFunction = hashFunction; + this.keyFunnel = keyFunnel; + this.nodeFunnel = nodeFunnel; + this.virtualNumber = virtualNumber; + for (N node : nodes) { + addNode(node); + } + } + + public void addNode(N node) { + for (int i = 0; i < virtualNumber; i++) { + VirtualNode vNode = new VirtualNode(node, i); + ring.put(vNode.hashValue(), vNode); + } + } + + public void removeNode(N node) { + for (int i = 0; i < virtualNumber; i++) { + VirtualNode vNode = new VirtualNode(node, i); + ring.remove(vNode.hashValue()); + } + } + + public N getNode(K key) { + if (ring.isEmpty()) { + return null; + } + Hasher hasher = hashFunction.newHasher(); + Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong(); + SortedMap tailMap = ring.tailMap(hashKey); + hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); + return ring.get(hashKey).getNode(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 6077291077..77775c713c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -31,6 +31,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.telemetry.ScopedSpan; import org.apache.doris.common.telemetry.Telemetry; +import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; import org.apache.doris.common.util.ProfileWriter; @@ -59,6 +60,7 @@ import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.planner.external.ExternalScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; @@ -77,6 +79,7 @@ import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentParamsList; +import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPlanFragmentDestination; @@ -104,6 +107,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; +import com.google.common.hash.Funnel; +import com.google.common.hash.Hashing; +import com.google.common.hash.PrimitiveSink; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; @@ -115,10 +121,12 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.jetbrains.annotations.NotNull; +import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -134,6 +142,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); @@ -247,6 +256,26 @@ public class Coordinator { private boolean isPointQuery = false; private PointQueryExec pointExec = null; + private static class BackendHash implements Funnel { + @Override + public void funnel(Backend backend, PrimitiveSink primitiveSink) { + primitiveSink.putBytes(backend.getHost().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putInt(backend.getBePort()); + } + } + + private static class ScanRangeHash implements Funnel { + @Override + public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { + Preconditions.checkState(scanRange.scan_range.isSetExtScanRange()); + for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) { + primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8)); + primitiveSink.putLong(desc.start_offset); + primitiveSink.putLong(desc.size); + } + } + } + // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); @@ -1768,12 +1797,56 @@ public class Coordinator { return location; } + private void computeScanRangeAssignmentByConsistentHash( + ScanNode scanNode, + final List locations, + FragmentScanRangeAssignment assignment, + Map assignedBytesPerHost, + Map replicaNumPerHost) throws Exception { + Collection aliveBEs = idToBackend.values().stream().filter(SimpleScheduler::isAvailable) + .collect(Collectors.toList()); + if (aliveBEs.isEmpty()) { + throw new UserException("No available backends"); + } + int virtualNumber = Math.max(Math.min(512 / aliveBEs.size(), 32), 2); + ConsistentHash consistentHash = new ConsistentHash<>( + Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), aliveBEs, virtualNumber); + for (TScanRangeLocations scanRangeLocations : locations) { + TScanRangeLocation minLocation = scanRangeLocations.locations.get(0); + Backend backend = consistentHash.getNode(scanRangeLocations); + TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); + this.addressToBackendID.put(execHostPort, backend.getId()); + // Why only increase 1 in other implementations ? + if (scanRangeLocations.getScanRange().isSetExtScanRange()) { + for (TFileRangeDesc desc : scanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges) { + assignedBytesPerHost.compute(execHostPort, (k, v) -> (v == null) ? desc.size : desc.size + v); + } + } + // Is replicaNumPerHost useful ? + replicaNumPerHost.computeIfPresent(minLocation.server, (k, v) -> v - 1); + + Map> scanRanges = findOrInsert(assignment, execHostPort, new HashMap<>()); + List scanRangeParamsList = + findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>()); + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.scan_range = scanRangeLocations.scan_range; + scanRangeParams.setVolumeId(minLocation.volume_id); + scanRangeParamsList.add(scanRangeParams); + } + } + private void computeScanRangeAssignmentByScheduler( final ScanNode scanNode, final List locations, FragmentScanRangeAssignment assignment, Map assignedBytesPerHost, Map replicaNumPerHost) throws Exception { + if (scanNode instanceof ExternalScanNode) { + // Use consistent hash to assign the same scan range into the same backend among different queries + computeScanRangeAssignmentByConsistentHash( + scanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost); + return; + } for (TScanRangeLocations scanRangeLocations : locations) { Reference backendIdRef = new Reference(); TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,