From 2e95b1c38935f53ed5a16720771d9d45efd91d9e Mon Sep 17 00:00:00 2001 From: xinghuayu007 <1450306854@qq.com> Date: Thu, 31 Dec 2020 09:47:06 +0800 Subject: [PATCH] [Enhancement]Make Cholocate table join more load balance (#5104) When two colocate tables make join operation, to make join operation locally, the tablet belongs to the same bucket sequence will be distributed to the same host. When choosing which host for a bucket sequence, it takes random strategy. Random strategy can not make query task load balance logically for one query. Therefore, this patch takes round-robin strategy, make buckets distributed evenly. For example, if there are 6 bucket sequences and 3 hosts, it is better to distributed 2 buckets sequence for every host. --- .../java/org/apache/doris/qe/Coordinator.java | 67 ++++++++++--------- .../org/apache/doris/qe/SimpleScheduler.java | 32 +++++++++ .../org/apache/doris/qe/CoordinatorTest.java | 61 +++++++++++++++++ 3 files changed, 127 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0f75ffd0c7..ff05413d56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1251,12 +1251,12 @@ public class Coordinator { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap()); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); - + HashMap assignedBytesPerHost = Maps.newHashMap(); for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { - getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq); + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost); } for(TScanRangeLocations location: locations) { @@ -1274,50 +1274,51 @@ public class Coordinator { } } - // randomly choose a backend from the TScanRangeLocations for a certain bucket sequence. - private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception { - int randomLocation = new Random().nextInt(seqLocation.locations.size()); + //ensure bucket sequence distribued to every host evenly + private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq, + HashMap assignedBytesPerHost) throws Exception { Reference backendIdRef = new Reference(); - TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef); + selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, backendIdRef); + Backend backend = this.idToBackend.get(backendIdRef.getRef()); + TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } + public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation, + HashMap assignedBytesPerHost, + Reference backendIdRef) throws UserException { + Long minAssignedBytes = Long.MAX_VALUE; + TScanRangeLocation minLocation = null; + Long step = 1L; + for (final TScanRangeLocation location : seqLocation.getLocations()) { + Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); + if (assignedBytes < minAssignedBytes) { + minAssignedBytes = assignedBytes; + minLocation = location; + } + } + TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef); + if (assignedBytesPerHost.containsKey(location.server)) { + assignedBytesPerHost.put(location.server, + assignedBytesPerHost.get(location.server) + step); + } else { + assignedBytesPerHost.put(location.server, step); + } + return location; + } + private void computeScanRangeAssignmentByScheduler( final ScanNode scanNode, final List locations, FragmentScanRangeAssignment assignment) throws Exception { HashMap assignedBytesPerHost = Maps.newHashMap(); - Long step = 1L; for (TScanRangeLocations scanRangeLocations : locations) { - // assign this scan range to the host w/ the fewest assigned bytes - Long minAssignedBytes = Long.MAX_VALUE; - TScanRangeLocation minLocation = null; - for (final TScanRangeLocation location : scanRangeLocations.getLocations()) { - Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); - if (assignedBytes < minAssignedBytes) { - minAssignedBytes = assignedBytes; - minLocation = location; - } - } - assignedBytesPerHost.put(minLocation.server, - assignedBytesPerHost.get(minLocation.server) + step); - Reference backendIdRef = new Reference(); - TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id, - scanRangeLocations.getLocations(), this.idToBackend, backendIdRef); - if (!execHostPort.hostname.equals(minLocation.server.hostname) || - execHostPort.port != minLocation.server.port) { - assignedBytesPerHost.put(minLocation.server, - assignedBytesPerHost.get(minLocation.server) - step); - Long id = assignedBytesPerHost.get(execHostPort); - if (id == null) { - assignedBytesPerHost.put(execHostPort, 0L); - } else { - assignedBytesPerHost.put(execHostPort, id + step); - } - } + TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost, backendIdRef); + Backend backend = this.idToBackend.get(backendIdRef.getRef()); + TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); Map> scanRanges = findOrInsert(assignment, execHostPort, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index d2c88fe7c5..3943a13291 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -92,6 +92,38 @@ public class SimpleScheduler { backends, locations.size())); } + public static TScanRangeLocation getLocation(TScanRangeLocation minLocation, + List locations, + ImmutableMap backends, + Reference backendIdRef) + throws UserException { + if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) { + throw new UserException("scan range location or candidate backends is empty"); + } + Backend backend = backends.get(minLocation.backend_id); + if (isAvailable(backend)) { + backendIdRef.setRef(minLocation.backend_id); + return minLocation; + } else { + for (TScanRangeLocation location : locations) { + if (location.backend_id == minLocation.backend_id) { + continue; + } + // choose the first alive backend(in analysis stage, the locations are random) + Backend candidateBackend = backends.get(location.backend_id); + if (isAvailable(candidateBackend)) { + backendIdRef.setRef(location.backend_id); + return location; + } + } + } + + // no backend returned + throw new UserException("there is no scanNode Backend. " + + getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()), + backends, locations.size())); + } + public static TNetworkAddress getHost(ImmutableMap backends, Reference backendIdRef) throws UserException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index b2f780f679..35efae288a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -59,6 +59,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; +import org.apache.commons.collections.map.HashedMap; public class CoordinatorTest extends Coordinator { static Planner planner = new Planner(); @@ -469,5 +471,64 @@ public class CoordinatorTest extends Coordinator { } } } + + @Test + public void testGetExecHostPortForFragmentIDAndBucketSeq() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} + TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); + TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); + tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050); + TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); + tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050); + TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); + tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050); + tScanRangeLocations.locations = new ArrayList<>(); + tScanRangeLocations.locations.add(tScanRangeLocation0); + tScanRangeLocations.locations.add(tScanRangeLocation1); + tScanRangeLocations.locations.add(tScanRangeLocation2); + + // init all backend + Backend backend0 = new Backend(0, "0.0.0.0", 9060); + backend0.setAlive(true); + backend0.setBePort(9050); + Backend backend1 = new Backend(1, "0.0.0.1", 9060); + backend1.setAlive(true); + backend1.setBePort(9050); + Backend backend2 = new Backend(2, "0.0.0.2", 9060); + backend2.setAlive(true); + backend2.setBePort(9050); + + ImmutableMap idToBackend = + new ImmutableMap.Builder(). + put(0l, backend0). + put(1l, backend1). + put(2l, backend2).build(); + Deencapsulation.setField(coordinator, "idToBackend", idToBackend); + Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); + fragmentIdToSeqToAddressMap.put(planFragmentId, new HashedMap()); + Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap); + List locations = new ArrayList<>(); + locations.add(tScanRangeLocations); + + HashMap assignedBytesPerHost = Maps.newHashMap(); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 1, assignedBytesPerHost); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 2, assignedBytesPerHost); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 3, assignedBytesPerHost); + List hosts = new ArrayList<>(); + for (Map.Entry item:assignedBytesPerHost.entrySet()) { + Assert.assertTrue((Long)item.getValue() == 1); + TNetworkAddress addr = (TNetworkAddress)item.getKey(); + hosts.add(addr.hostname); + } + Assert.assertTrue(hosts.size() == 3); + } }