[Enhancement] Fill assignment param of bucket shuffle and colocate shuffle for debug (#5167)

When Doris is in debug mode, function `Coordinator#traceInstance` is used to print
the physical execute plan of a fragment instance for debug.
Function  `Coordinator#traceInstance` uses param `scanRangeAssignment` to print
the detail of a fragment. But bucket shuffle join and colocate shuffle join do not fill the param.
That will cause debug not work well.
This path fill assignment param of bucket shuffle and colocate shuffle for debug.
This commit is contained in:
xinghuayu007
2021-01-16 21:37:33 +08:00
committed by GitHub
parent 99b22c92f8
commit 3dcbbbea95
2 changed files with 102 additions and 5 deletions

View File

@ -1183,9 +1183,10 @@ public class Coordinator {
}
addressToScanRanges.get(address).add(filteredNodeScanRanges);
}
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Map<Integer, List<TScanRangeParams>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<Integer, List<TScanRangeParams>>());
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
@ -1200,13 +1201,17 @@ public class Coordinator {
for (List<Map<Integer, List<TScanRangeParams>>> perInstanceScanRange : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
for (Map<Integer, List<TScanRangeParams>> nodeScanRangeMap : perInstanceScanRange) {
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), nodeScanRange.getValue());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
} else {
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
}
}
}
params.instanceExecParams.add(instanceParam);
@ -1624,7 +1629,6 @@ public class Coordinator {
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
@ -1639,9 +1643,10 @@ public class Coordinator {
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<Integer, List<TScanRangeParams>>());
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
@ -1660,8 +1665,10 @@ public class Coordinator {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), nodeScanRange.getValue());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
} else {
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
}
}

View File

@ -261,6 +261,46 @@ public class CoordinatorTest extends Coordinator {
Assert.assertEquals(targetBeCount, 3);
}
@Test
public void testColocateJoinAssignment() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
PlanFragmentId planFragmentId = new PlanFragmentId(1);
int scanNodeId = 1;
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", fragmentIdToScanNodeIds);
// 1. set fragmentToBucketSeqToAddress in coordinator
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
TNetworkAddress address = new TNetworkAddress();
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
// 2. set bucketSeqToScanRange in coordinator
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange);
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test");
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params);
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
}
@Test
public void testComputeScanRangeAssignmentByBucket() {
PlanFragmentId planFragmentId = new PlanFragmentId(1);
@ -375,11 +415,16 @@ public class CoordinatorTest extends Coordinator {
fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
Assert.assertEquals(1, params.instanceExecParams.size());
try {
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
System.out.println(sb);
} catch (Exception e) {
e.printStackTrace();
}
params = new FragmentExecParams(null);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params);
Assert.assertEquals(2, params.instanceExecParams.size());
@ -393,6 +438,51 @@ public class CoordinatorTest extends Coordinator {
Assert.assertEquals(3, params.instanceExecParams.size());
}
@Test
public void testBucketShuffleAssignment() {
PlanFragmentId planFragmentId = new PlanFragmentId(1);
int scanNodeId = 1;
// set fragment id to scan node ids map
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
Coordinator.BucketShuffleJoinController bucketShuffleJoinController
= new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);
// 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
TNetworkAddress address = new TNetworkAddress();
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);
// 2. set bucketSeqToScanRange in bucketShuffleJoinController
Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test");
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
Assert.assertEquals(1, params.instanceExecParams.size());
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
}
@Test
public void testComputeScanRangeAssignmentByScheduler() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);