diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 558af53c07..0cd3f0ca10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1183,9 +1183,10 @@ public class Coordinator { } addressToScanRanges.get(address).add(filteredNodeScanRanges); } - + FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>> addressScanRange : addressToScanRanges.entrySet()) { List>> scanRange = addressScanRange.getValue(); + Map> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap>()); int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1200,13 +1201,17 @@ public class Coordinator { for (List>> perInstanceScanRange : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); + for (Map> nodeScanRangeMap : perInstanceScanRange) { for (Map.Entry> nodeScanRange : nodeScanRangeMap.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); } else { + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } + } } params.instanceExecParams.add(instanceParam); @@ -1624,7 +1629,6 @@ public class Coordinator { for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); Map> nodeScanRanges = scanRanges.getValue(); - // We only care about the node scan ranges of scan nodes which belong to this fragment Map> filteredNodeScanRanges = Maps.newHashMap(); for (Integer scanNodeId : nodeScanRanges.keySet()) { @@ -1639,9 +1643,10 @@ public class Coordinator { } addressToScanRanges.get(address).add(filteredScanRanges); } - + FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>>> addressScanRange : addressToScanRanges.entrySet()) { List>>> scanRange = addressScanRange.getValue(); + Map> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap>()); int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1660,8 +1665,10 @@ public class Coordinator { instanceParam.addBucketSeq(nodeScanRangeMap.first); for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); } else { + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 40b75682e1..ea4adfab5c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -261,6 +261,46 @@ public class CoordinatorTest extends Coordinator { Assert.assertEquals(targetBeCount, 3); } + @Test + public void testColocateJoinAssignment() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", fragmentIdToScanNodeIds); + + // 1. set fragmentToBucketSeqToAddress in coordinator + Map bucketSeqToAddress = new HashMap<>(); + TNetworkAddress address = new TNetworkAddress(); + for (int i = 0; i < 3; i++) { + bucketSeqToAddress.put(i, address); + } + Map> fragmentToBucketSeqToAddress = new HashMap<>(); + fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); + Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); + + // 2. set bucketSeqToScanRange in coordinator + BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + Map> ScanRangeMap = new HashMap<>(); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); + for (int i = 0; i < 3; i++) { + bucketSeqToScanRange.put(i, ScanRangeMap); + } + Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange); + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); + PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, + new DataPartition(TPartitionType.UNPARTITIONED)); + FragmentExecParams params = new FragmentExecParams(fragment); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); + } + @Test public void testComputeScanRangeAssignmentByBucket() { PlanFragmentId planFragmentId = new PlanFragmentId(1); @@ -375,11 +415,16 @@ public class CoordinatorTest extends Coordinator { fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); - FragmentExecParams params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); Assert.assertEquals(1, params.instanceExecParams.size()); - + try { + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + System.out.println(sb); + } catch (Exception e) { + e.printStackTrace(); + } params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); Assert.assertEquals(2, params.instanceExecParams.size()); @@ -393,6 +438,51 @@ public class CoordinatorTest extends Coordinator { Assert.assertEquals(3, params.instanceExecParams.size()); } + @Test + public void testBucketShuffleAssignment() { + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + + // set fragment id to scan node ids map + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); + + // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController + Map bucketSeqToAddress = new HashMap<>(); + TNetworkAddress address = new TNetworkAddress(); + for (int i = 0; i < 3; i++) { + bucketSeqToAddress.put(i, address); + } + Map> fragmentToBucketSeqToAddress = new HashMap<>(); + fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); + Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); + + // 2. set bucketSeqToScanRange in bucketShuffleJoinController + Map fragmentIdBucketSeqToScanRangeMap = new HashMap<>(); + BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + Map> ScanRangeMap = new HashMap<>(); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); + for (int i = 0; i < 3; i++) { + bucketSeqToScanRange.put(i, ScanRangeMap); + } + fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); + Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); + PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, + new DataPartition(TPartitionType.UNPARTITIONED)); + + FragmentExecParams params = new FragmentExecParams(fragment); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Assert.assertEquals(1, params.instanceExecParams.size()); + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); + } + @Test public void testComputeScanRangeAssignmentByScheduler() { Coordinator coordinator = new Coordinator(context, analyzer, planner);