From 1035e86e0bc568b9c54e823919aa247baf40faae Mon Sep 17 00:00:00 2001 From: xinghuayu007 <1450306854@qq.com> Date: Wed, 6 Jan 2021 10:20:16 +0800 Subject: [PATCH] [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133) --- .../java/org/apache/doris/qe/Coordinator.java | 14 +++- .../org/apache/doris/qe/CoordinatorTest.java | 65 +++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ff05413d56..f5dd31017c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1560,14 +1560,22 @@ public class Coordinator { break; } } - - buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1); Reference backendIdRef = new Reference(); TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef); if (execHostPort == null) { throw new UserException("there is no scanNode Backend"); } - + //the backend with buckendId is not alive, chose another new backend + if (backendIdRef.getRef() != buckendId) { + //buckendIdToBucketCountMap does not contain the new backend, insert into it + if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) { + buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1); + } else { //buckendIdToBucketCountMap contains the new backend, update it + buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1); + } + } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly + buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1); + } addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 35efae288a..b56fc729c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator { } Assert.assertTrue(hosts.size() == 3); } + + @Test + public void testBucketShuffleWithUnaliveBackend() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} + TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); + TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); + tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050); + TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); + tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050); + TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); + tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050); + tScanRangeLocations.locations = new ArrayList<>(); + tScanRangeLocations.locations.add(tScanRangeLocation0); + tScanRangeLocations.locations.add(tScanRangeLocation1); + tScanRangeLocations.locations.add(tScanRangeLocation2); + + // init all backend + Backend backend0 = new Backend(0, "0.0.0.0", 9060); + backend0.setAlive(false); + backend0.setBePort(9050); + Backend backend1 = new Backend(1, "0.0.0.1", 9060); + backend1.setAlive(true); + backend1.setBePort(9050); + Backend backend2 = new Backend(2, "0.0.0.2", 9060); + backend2.setAlive(true); + backend2.setBePort(9050); + Map addressToBackendID = Maps.newHashMap(); + addressToBackendID.put(tScanRangeLocation0.server, tScanRangeLocation0.backend_id); + addressToBackendID.put(tScanRangeLocation1.server, tScanRangeLocation1.backend_id); + addressToBackendID.put(tScanRangeLocation2.server, tScanRangeLocation2.backend_id); + + ImmutableMap idToBackend = + new ImmutableMap.Builder(). + put(0l, backend0). + put(1l, backend1). + put(2l, backend2).build(); + Map> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap(); + Map backendIdBucketCountMap = new HashMap(); + fragmentIdToBuckendIdBucketCountMap.put(planFragmentId, backendIdBucketCountMap); + Map> fragmentIdToScanNodeIds = new HashMap<>(); + BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds); + Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); + fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap()); + Deencapsulation.setField(controller, "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap); + Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap); + Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq", + tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID); + Assert.assertTrue(backendIdBucketCountMap.size() == 2); + List backendIds = new ArrayList(); + List counts = new ArrayList(); + for (Map.Entry item:backendIdBucketCountMap.entrySet()) { + backendIds.add(item.getKey()); + counts.add(item.getValue()); + } + Assert.assertTrue(backendIds.get(0) == 0); + Assert.assertTrue(counts.get(0) == 0); + Assert.assertTrue(backendIds.get(1) == 1); + Assert.assertTrue(counts.get(1) == 1); + } }