diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dac9be06b9..19ca105046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2107,10 +2107,11 @@ public class Coordinator implements CoordInterface { if ((isColocateFragment(fragment, fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) { - computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); + computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params, + fragment.hasNullAwareLeftAntiJoin()); } else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) { bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), - parallelExecInstanceNum, params); + parallelExecInstanceNum, params, fragment.hasNullAwareLeftAntiJoin()); } else { // case A for (Entry>> entry : fragmentExecParamsMap.get( @@ -2135,7 +2136,8 @@ public class Coordinator implements CoordInterface { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() + && !fragment.hasNullAwareLeftAntiJoin() && useNereids; boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size())) && useNereids); @@ -2304,9 +2306,9 @@ public class Coordinator implements CoordInterface { } private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } private Map getReplicaNumPerHostForOlapTable() { @@ -3049,16 +3051,16 @@ public class Coordinator implements CoordInterface { } private void computeInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } } private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params, Map fragmentIdBucketSeqToScanRangeMap, Map> curFragmentIdToSeqToAddressMap, - Map> fragmentIdToScanNodeIds) { + Map> fragmentIdToScanNodeIds, boolean hasNullAwareLeftAntiJoin) { Map bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); Set scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); @@ -3092,7 +3094,7 @@ public class Coordinator implements CoordInterface { * 2. Use Nereids planner. */ boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() && !hasNullAwareLeftAntiJoin && useNereids; boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, addressToBackendID.size())) diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 4c38ddd274..9b3ed7d311 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -122,7 +122,7 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); // check whether one instance have 3 tablet to scan @@ -133,15 +133,15 @@ public class CoordinatorTest extends Coordinator { } params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -323,7 +323,7 @@ public class CoordinatorTest extends Coordinator { PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); StringBuilder sb = new StringBuilder(); params.appendTo(sb); Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); @@ -451,19 +451,19 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -505,7 +505,7 @@ public class CoordinatorTest extends Coordinator { new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); StringBuilder sb = new StringBuilder(); params.appendTo(sb);