From 3dcbbbea956c80a91b5e0708aeedc938dd2d2d1d Mon Sep 17 00:00:00 2001 From: xinghuayu007 <1450306854@qq.com> Date: Sat, 16 Jan 2021 21:37:33 +0800 Subject: [PATCH] [Enhancement] Fill assignment param of bucket shuffle and colocate shuffle for debug (#5167) When Doris is in debug mode, function `Coordinator#traceInstance` is used to print the physical execute plan of a fragment instance for debug. Function `Coordinator#traceInstance` uses param `scanRangeAssignment` to print the detail of a fragment. But bucket shuffle join and colocate shuffle join do not fill the param. That will cause debug not work well. This path fill assignment param of bucket shuffle and colocate shuffle for debug. --- .../java/org/apache/doris/qe/Coordinator.java | 13 ++- .../org/apache/doris/qe/CoordinatorTest.java | 94 ++++++++++++++++++- 2 files changed, 102 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 558af53c07..0cd3f0ca10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1183,9 +1183,10 @@ public class Coordinator { } addressToScanRanges.get(address).add(filteredNodeScanRanges); } - + FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>> addressScanRange : addressToScanRanges.entrySet()) { List>> scanRange = addressScanRange.getValue(); + Map> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap>()); int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1200,13 +1201,17 @@ public class Coordinator { for (List>> perInstanceScanRange : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); + for (Map> nodeScanRangeMap : perInstanceScanRange) { for (Map.Entry> nodeScanRange : nodeScanRangeMap.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); } else { + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } + } } params.instanceExecParams.add(instanceParam); @@ -1624,7 +1629,6 @@ public class Coordinator { for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); Map> nodeScanRanges = scanRanges.getValue(); - // We only care about the node scan ranges of scan nodes which belong to this fragment Map> filteredNodeScanRanges = Maps.newHashMap(); for (Integer scanNodeId : nodeScanRanges.keySet()) { @@ -1639,9 +1643,10 @@ public class Coordinator { } addressToScanRanges.get(address).add(filteredScanRanges); } - + FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>>> addressScanRange : addressToScanRanges.entrySet()) { List>>> scanRange = addressScanRange.getValue(); + Map> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap>()); int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1660,8 +1665,10 @@ public class Coordinator { instanceParam.addBucketSeq(nodeScanRangeMap.first); for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); } else { + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 40b75682e1..ea4adfab5c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -261,6 +261,46 @@ public class CoordinatorTest extends Coordinator { Assert.assertEquals(targetBeCount, 3); } + @Test + public void testColocateJoinAssignment() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", fragmentIdToScanNodeIds); + + // 1. set fragmentToBucketSeqToAddress in coordinator + Map bucketSeqToAddress = new HashMap<>(); + TNetworkAddress address = new TNetworkAddress(); + for (int i = 0; i < 3; i++) { + bucketSeqToAddress.put(i, address); + } + Map> fragmentToBucketSeqToAddress = new HashMap<>(); + fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); + Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); + + // 2. set bucketSeqToScanRange in coordinator + BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + Map> ScanRangeMap = new HashMap<>(); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); + for (int i = 0; i < 3; i++) { + bucketSeqToScanRange.put(i, ScanRangeMap); + } + Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange); + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); + PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, + new DataPartition(TPartitionType.UNPARTITIONED)); + FragmentExecParams params = new FragmentExecParams(fragment); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); + } + @Test public void testComputeScanRangeAssignmentByBucket() { PlanFragmentId planFragmentId = new PlanFragmentId(1); @@ -375,11 +415,16 @@ public class CoordinatorTest extends Coordinator { fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); - FragmentExecParams params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); Assert.assertEquals(1, params.instanceExecParams.size()); - + try { + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + System.out.println(sb); + } catch (Exception e) { + e.printStackTrace(); + } params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); Assert.assertEquals(2, params.instanceExecParams.size()); @@ -393,6 +438,51 @@ public class CoordinatorTest extends Coordinator { Assert.assertEquals(3, params.instanceExecParams.size()); } + @Test + public void testBucketShuffleAssignment() { + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + + // set fragment id to scan node ids map + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); + + // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController + Map bucketSeqToAddress = new HashMap<>(); + TNetworkAddress address = new TNetworkAddress(); + for (int i = 0; i < 3; i++) { + bucketSeqToAddress.put(i, address); + } + Map> fragmentToBucketSeqToAddress = new HashMap<>(); + fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); + Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); + + // 2. set bucketSeqToScanRange in bucketShuffleJoinController + Map fragmentIdBucketSeqToScanRangeMap = new HashMap<>(); + BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + Map> ScanRangeMap = new HashMap<>(); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); + for (int i = 0; i < 3; i++) { + bucketSeqToScanRange.put(i, ScanRangeMap); + } + fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); + Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); + PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, + new DataPartition(TPartitionType.UNPARTITIONED)); + + FragmentExecParams params = new FragmentExecParams(fragment); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Assert.assertEquals(1, params.instanceExecParams.size()); + StringBuilder sb = new StringBuilder(); + params.appendTo(sb); + Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); + } + @Test public void testComputeScanRangeAssignmentByScheduler() { Coordinator coordinator = new Coordinator(context, analyzer, planner);