diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 60707a04f8..a69d4ff226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -977,24 +977,14 @@ public class Coordinator { continue; } + int parallelExecInstanceNum = fragment.getParallelExecNum(); //for ColocateJoin fragment if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) { - Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()); - for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, params); - - Map> nodeScanRanges = scanRanges.getValue(); - for (Map.Entry> nodeScanRange : nodeScanRanges.entrySet()) { - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); - } - - params.instanceExecParams.add(instanceParam); - } + computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); } else { // case A Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator(); - int parallelExecInstanceNum = fragment.getParallelExecNum(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); TNetworkAddress key = (TNetworkAddress) entry.getKey(); @@ -1103,6 +1093,51 @@ public class Coordinator { return 1; } + private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { + Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); + + // 1. count each node in one fragment should scan how many tablet, gather them in one list + Map>>> addressToScanRanges = Maps.newHashMap(); + for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { + TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); + Map> nodeScanRanges = scanRanges.getValue(); + + if (!addressToScanRanges.containsKey(address)) { + addressToScanRanges.put(address, Lists.newArrayList()); + } + addressToScanRanges.get(address).add(nodeScanRanges); + } + + for (Map.Entry>>> addressScanRange : addressToScanRanges.entrySet()) { + List>> scanRange = addressScanRange.getValue(); + int expectedInstanceNum = 1; + if (parallelExecInstanceNum > 1) { + //the scan instance num should not larger than the tablets num + expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum); + } + + // 2. split how many scanRange one instance should scan + List>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange, + expectedInstanceNum); + + // 3.constuct instanceExecParam add the scanRange should be scan by instance + for (List>> perInstanceScanRange : perInstanceScanRanges) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); + + for (Map> nodeScanRangeMap : perInstanceScanRange) { + for (Map.Entry> nodeScanRange : nodeScanRangeMap.entrySet()) { + if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); + } else { + instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + } + } + } + params.instanceExecParams.add(instanceParam); + } + } + } + // Populates scan_range_assignment_. // > private void computeScanRangeAssignment() throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index cb2b5d91b6..68d2675c15 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -17,38 +17,21 @@ package org.apache.doris.qe; -import mockit.Expectations; import mockit.Mocked; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Catalog; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.persist.EditLog; -import org.apache.doris.planner.DataPartition; -import org.apache.doris.planner.ExchangeNode; -import org.apache.doris.planner.OlapScanNode; -import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.PlanNode; -import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; import org.apache.doris.service.FrontendOptions; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; -import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; -import com.google.common.collect.ImmutableMap; - import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Test; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -67,10 +50,7 @@ public class CoordinatorTest extends Coordinator { @Mocked static FrontendOptions frontendOptions; static Analyzer analyzer = new Analyzer(catalog, null); - static Backend backendA; - static Backend backendB; - static Backend backendC; - static Backend backendD; + public CoordinatorTest() { super(context, analyzer, planner); @@ -78,386 +58,45 @@ public class CoordinatorTest extends Coordinator { private static Coordinator coor; - @BeforeClass - public static void beforeTest() throws IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchFieldException, - SecurityException, NoSuchMethodException { - coor = new Coordinator(context, analyzer, planner); - new Expectations() { - { - editLog.logAddBackend((Backend) any); - minTimes = 0; + @Test + public void testComputeColocateJoinInstanceParam() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); - editLog.logDropBackend((Backend) any); - minTimes = 0; + // 1. set fragmentToBucketSeqToAddress in coordinator + Map bucketSeqToAddress = new HashMap<>(); + TNetworkAddress address = new TNetworkAddress(); + for (int i = 0; i < 3; i++) { + bucketSeqToAddress.put(i, address); + } + PlanFragmentId planFragmentId = new PlanFragmentId(1); + Map> fragmentToBucketSeqToAddress = new HashMap<>(); + fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); + Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); - editLog.logBackendStateChange((Backend) any); - minTimes = 0; + // 2. set bucketSeqToScanRange in coordinator + BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + Map> ScanRangeMap = new HashMap<>(); + ScanRangeMap.put(1, new ArrayList<>()); + for (int i = 0; i < 3; i++) { + bucketSeqToScanRange.put(i, ScanRangeMap); + } + Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange); - catalog.getEditLog(); - minTimes = 0; - result = editLog; - } - }; + FragmentExecParams params = new FragmentExecParams(null); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Assert.assertEquals(1, params.instanceExecParams.size()); - new Expectations(catalog) { - { - Catalog.getCurrentCatalog(); - minTimes = 0; - result = catalog; + params = new FragmentExecParams(null); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params); + Assert.assertEquals(2, params.instanceExecParams.size()); - Catalog.getCurrentCatalogJournalVersion(); - minTimes = 0; - result = FeConstants.meta_version; - } - }; + params = new FragmentExecParams(null); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params); + Assert.assertEquals(3, params.instanceExecParams.size()); - new Expectations(frontendOptions) { - { - FrontendOptions.getLocalHostAddress(); - minTimes = 0; - result = "127.0.0.1"; - } - }; - - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; - backendA = new Backend(0, "machineA", 0); - backendA.updateOnce(10000, 0, 0); - backendB = new Backend(1, "machineB", 0); - backendB.updateOnce(10000, 0, 0); - backendC = new Backend(2, "machineC", 0); - backendC.updateOnce(10000, 0, 0); - backendD = new Backend(3, "machineD", 0); - backendD.updateOnce(10000, 0, 0); - - // private 方法赋值 - Field field = coor.getClass().getDeclaredField("idToBackend"); - field.setAccessible(true); - Map backendMap = new HashMap(); - backendMap.put(Long.valueOf(0), backendA); - backendMap.put(Long.valueOf(1), backendB); - ImmutableMap idToBackendAB = ImmutableMap.copyOf(backendMap); - field.set(coor, idToBackendAB); + params = new FragmentExecParams(null); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params); + Assert.assertEquals(3, params.instanceExecParams.size()); } - - static void invokeFunction(String functionName) throws IllegalAccessException, - IllegalArgumentException, InvocationTargetException, NoSuchFieldException, - SecurityException, NoSuchMethodException { - Method method = coor.getClass().getDeclaredMethod(functionName); - method.setAccessible(true); - method.invoke(coor); - } - - static Object getField(Object object, String fieldName) throws NoSuchFieldException, - SecurityException, IllegalArgumentException, IllegalAccessException { - Field field = object.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - Object after = field.get(object); - return after; - } - - /* - * 场景1:扫描2个scanRange,每个scanRange都分布在两台机器上(MachineA,machineB) - * 返回结果: 根据调度策略,machineA和machineB各扫描1个scanRange - * - * 场景2:扫描3个scanRange,每个scanRange都分布在两台机器上(MachineA,machineB) - * 返回结果: 根据调度策略,machineA扫描2个scanRange,machineB扫描1个scanRange - * - * 场景3:扫描3个scanRange,每个scanRange分布在不同的两台机器上(分别分布在(MachineA,machineB),(MachineA,machineC) - * (MachineC,machineB)上。 - * 返回结果:根据调度策略,machineA,machineB,machineC分布扫描1个scanRange - */ - // TODO(lingbin): PALO-2051. - // Comment out these code temporatily. - // @Test - public void testComputeScanRangeAssignment() throws IllegalAccessException, - IllegalArgumentException, InvocationTargetException, NoSuchFieldException, - SecurityException, NoSuchMethodException { - Method method = coor.getClass().getDeclaredMethod( - "computeScanRangeAssignment", - PlanNodeId.class, - List.class, - Coordinator.FragmentScanRangeAssignment.class); - method.setAccessible(true); - int planNodeId = 2; - // 输出参数 - FragmentScanRangeAssignment assignment = coor.new FragmentScanRangeAssignment(); - // 输入参数 - List locations = new ArrayList(); - - TScanRangeLocations scanRangeLocationsA = new TScanRangeLocations(); - List listLocationsA = new ArrayList(); - listLocationsA.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineA", 10000)).setBackendId(0)); - listLocationsA.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineB", 10000)).setBackendId(1)); - scanRangeLocationsA.setLocations(listLocationsA).setScanRange(new TScanRange()); - - TScanRangeLocations scanRangeLocationsB = new TScanRangeLocations(); - List listLocationsB = new ArrayList(); - listLocationsB.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineB", 10000)).setBackendId(1)); - listLocationsB.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineC", 10000)).setBackendId(2)); - scanRangeLocationsB.setLocations(listLocationsB).setScanRange(new TScanRange()); - - TScanRangeLocations scanRangeLocationsC = new TScanRangeLocations(); - List listLocationsC = new ArrayList(); - listLocationsC.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineC", 10000)).setBackendId(2)); - listLocationsC.add((new TScanRangeLocation()) - .setServer(new TNetworkAddress("machineA", 10000)).setBackendId(0)); - scanRangeLocationsC.setLocations(listLocationsC).setScanRange(new TScanRange()); - - // 场景1: 2个scanRange - { - assignment.clear(); - locations.clear(); - locations.add(scanRangeLocationsA); - locations.add(scanRangeLocationsA); - // 调用函数 - method.invoke( - coor, - new PlanNodeId(planNodeId), - locations, - assignment); - // 判断返回值 - Assert.assertEquals(assignment.get(new TNetworkAddress("machineA", 10000)) - .get(planNodeId).size(), 1); - Assert.assertEquals(assignment.get(new TNetworkAddress("machineB", 10000)) - .get(planNodeId).size(), 1); - } - // 场景2: 3个scanRange,每个scan_range都分布在两台机器上(A和B) - { - assignment.clear(); - locations.clear(); - locations.add(scanRangeLocationsA); - locations.add(scanRangeLocationsA); - locations.add(scanRangeLocationsA); - - // 调用函数 - method.invoke( - coor, - new PlanNodeId(planNodeId), - locations, - assignment); - // 判断返回值 - Assert.assertEquals(assignment.get(new TNetworkAddress("machineA", 10000)) - .get(planNodeId).size(), 2); - Assert.assertEquals(assignment.get(new TNetworkAddress("machineB", 10000)) - .get(planNodeId).size(), 1); - } - // 场景3: 3个scanRange,scan_range分别分布在(A,B)(A,C)(C,B)上 - { - Field field = coor.getClass().getDeclaredField("idToBackend"); - field.setAccessible(true); - Map backendMap = new HashMap(); - backendMap.put(Long.valueOf(0), backendA); - backendMap.put(Long.valueOf(1), backendB); - backendMap.put(Long.valueOf(2), backendC); - ImmutableMap idToBackendAB = ImmutableMap.copyOf(backendMap); - field.set(coor, idToBackendAB); - - assignment.clear(); - locations.clear(); - locations.add(scanRangeLocationsA); - locations.add(scanRangeLocationsB); - locations.add(scanRangeLocationsC); - // 调用函数 - method.invoke( - coor, - new PlanNodeId(planNodeId), - locations, - assignment); - // 判断返回值 - Assert.assertEquals(assignment.get(new TNetworkAddress("machineA", 10000)) - .get(planNodeId).size(), 1); - Assert.assertEquals(assignment.get(new TNetworkAddress("machineB", 10000)) - .get(planNodeId).size(), 1); - Assert.assertEquals(assignment.get(new TNetworkAddress("machineC", 10000)) - .get(planNodeId).size(), 1); - } - } - /* - * 场景1:扫描UNPARTITIONED的fragment - * 返回结果: fragment执行参数的host列表为随机分配fragment - * - * 场景2:fragment的最左节点为ScanNode - * 返回结果:fragment执行参数的host列表为ScanNode的host列表 - * - * 场景3:fragment的最左节点为非ScanNode - * 返回结果:fragment执行参数的host列表为来源fragment的执行参数的host列表 - */ - // TODO(lingbin): PALO-2051. - // Comment out these code temporatily. - // @Test - public void testcomputeFragmentHosts() throws NoSuchMethodException, SecurityException, - IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchFieldException { - Method method = coor.getClass().getDeclaredMethod( - "computeFragmentHosts"); - method.setAccessible(true); - // 场景1:UNPARTITIONED - { - PlanFragment fragment = new PlanFragment(new PlanFragmentId(1), - new OlapScanNode(new PlanNodeId(1), new TupleDescriptor(new TupleId(10)), "null scanNode"), - DataPartition.UNPARTITIONED ); - List privateFragments = - (ArrayList) getField(coor, "fragments"); - Map privateFragmentExecParams = - (HashMap) getField( - coor, "fragmentExecParams"); - privateFragments.clear(); - privateFragments.add(fragment); - privateFragmentExecParams.put(new PlanFragmentId(1), new FragmentExecParams(fragment)); - - // 调用函数 - method.invoke(coor); - // 判断返回值 - // Assert.assertEquals(privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(0).hostname, "machineA"); - // Assert.assertEquals(privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(0).port, 10000); - } - // 场景2: ScanNode - { - PlanFragment fragment = new PlanFragment(new PlanFragmentId(1), - new OlapScanNode(new PlanNodeId(1), new TupleDescriptor(new TupleId(10)), "null scanNode"), - DataPartition.RANDOM ); - List privateFragments = - (ArrayList) getField(coor, "fragments"); - Map privateFragmentExecParams = - (HashMap) getField( - coor, "fragmentExecParams"); - Map privateScanRangeAssignment = - (HashMap) getField( - coor, "scanRangeAssignment"); - FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); - assignment.put(new TNetworkAddress("machineC", 10000), null); - assignment.put(new TNetworkAddress("machineD", 10000), null); - - privateScanRangeAssignment.put(new PlanFragmentId(1), assignment); - - privateFragments.clear(); - privateFragments.add(fragment); - privateFragmentExecParams.put(new PlanFragmentId(1), new FragmentExecParams(fragment)); - - // 调用函数 - method.invoke(coor); - // 判断返回值 - // Assert.assertEquals(2, privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.size()); - // String hostname1 = privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(0).hostname; - // String hostname2 = privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(1).hostname; - // Assert.assertTrue(hostname1.equals("machineC") || hostname1.equals("machineD")); - // Assert.assertTrue(hostname2.equals("machineC") || hostname1.equals("machineD")); - // Assert.assertFalse(hostname1.equals(hostname2)); - } - // 场景3: 非ScanNode - { - /* fragmentFather(UNPARITIONED) fragmentID=0 - * exchangeNode - * - * fragmentSon(RANDOM) fragmentID=1 - * olapScannode - * */ - - Field field = coor.getClass().getDeclaredField("idToBackend"); - field.setAccessible(true); - Map backendMap = new HashMap(); - backendMap.put(Long.valueOf(0), backendA); - backendMap.put(Long.valueOf(1), backendB); - backendMap.put(Long.valueOf(2), backendC); - backendMap.put(Long.valueOf(3), backendD); - ImmutableMap idToBackendAB = ImmutableMap.copyOf(backendMap); - field.set(coor, idToBackendAB); - - PlanNode olapNode = new OlapScanNode(new PlanNodeId(1), new TupleDescriptor(new TupleId(10)), - "null scanNode"); - PlanFragment fragmentFather = new PlanFragment(new PlanFragmentId(0), - new ExchangeNode(new PlanNodeId(10), olapNode, false), - DataPartition.UNPARTITIONED); - PlanFragment fragmentSon = new PlanFragment(new PlanFragmentId(1), - olapNode, - DataPartition.RANDOM ); - // fragmentSon.setDestination(fragmentFather, new PlanNodeId(10)); - - List privateFragments = (ArrayList) getField( - coor, "fragments"); - Map privateFragmentExecParams = - (HashMap) getField( - coor, "fragmentExecParams"); - Map privateScanRangeAssignment = - (HashMap) getField( - coor, "scanRangeAssignment"); - - FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); - assignment.put(new TNetworkAddress("machineC", 10000), null); - assignment.put(new TNetworkAddress("machineD", 10000), null); - privateScanRangeAssignment.put(new PlanFragmentId(1), assignment); - - privateFragments.clear(); - privateFragments.add(fragmentFather); - privateFragments.add(fragmentSon); - - privateFragmentExecParams.put(new PlanFragmentId(1), - new FragmentExecParams(fragmentSon)); - privateFragmentExecParams.put(new PlanFragmentId(0), - new FragmentExecParams(fragmentFather)); - - // 调用函数 - method.invoke(coor); - // 判断返回值 - // Assert.assertEquals(privateFragmentExecParams.get(new PlanFragmentId(0)) - // .hosts.get(0).hostname, "machineB"); - // Assert.assertEquals(privateFragmentExecParams.get(new PlanFragmentId(0)) - // .hosts.get(0).port, 10000); - // Assert.assertEquals(2, privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.size()); - // String hostname1 = privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(0).hostname; - // String hostname2 = privateFragmentExecParams.get(new PlanFragmentId(1)) - // .hosts.get(1).hostname; - // Assert.assertTrue(hostname1.equals("machineC") || hostname1.equals("machineD")); - // Assert.assertTrue(hostname2.equals("machineC") || hostname2.equals("machineD")); - // Assert.assertFalse(hostname1.equals(hostname2)); - } - } - - /* - public void testNetworkException() throws TException, NoSuchFieldException, - SecurityException, IllegalArgumentException, IllegalAccessException, - NoSuchMethodException, InvocationTargetException { - Map privateFragmentExecParams = - (HashMap) getField( - coor, "fragmentExecParams"); - ConcurrentMap privateBackendExecStateMap = - (ConcurrentMap) getField(coor, "backendExecStateMap"); - TQueryOptions privateQueryOptions = (TQueryOptions) getField(coor, "queryOptions"); - // 设置超时时间为2s,尽快返回连接超时 - privateQueryOptions.setQueryTimeout(2); - // Configure.qe_query_timeout_s = 2; - - privateFragmentExecParams.clear(); - privateFragmentExecParams.put(new PlanFragmentId(23), new FragmentExecParams(null)); - // privateFragmentExecParams.get(new PlanFragmentId(23)).hosts.add( - // new TNetworkAddress("machine", 10000)); - privateBackendExecStateMap.put(new TUniqueId(11, 12), coor.new BackendExecState( - new PlanFragmentId(23), 0, 0, new TExecPlanFragmentParams(), - new HashMap())); - // 调用函数 - boolean isException = false; - try { - privateBackendExecStateMap.get(new TUniqueId(11, 12)).execRemoteFragment(); - } catch (org.apache.thrift.transport.TTransportException e) { - isException = true; - } catch (Exception e) { - isException = false; - } - Assert.assertTrue("need get the TTransportException", isException); - } - */ }