[Feature](Nereids) unnest subquery in 'not in' predicate into NULL AWARE ANTI JOIN (#15230)

when we process not in subquery. if the subquery return column is nullable, we need a NULL AWARE ANTI JOIN instead of ANTI JOIN.
Doris already support NULL AWARE ANTI JOIN in PR #13871
Nereids need to do that so.
This commit is contained in:
zhengshiJ
2022-12-22 14:13:47 +08:00
committed by GitHub
parent 87756f5441
commit a87f905a2d
17 changed files with 132 additions and 11 deletions

View File

@ -745,7 +745,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
TupleDescriptor intermediateDescriptor = context.generateTupleDesc();
if (hashJoin.getOtherJoinConjuncts().isEmpty()
&& (joinType == JoinType.LEFT_ANTI_JOIN || joinType == JoinType.LEFT_SEMI_JOIN)) {
&& (joinType == JoinType.LEFT_ANTI_JOIN
|| joinType == JoinType.LEFT_SEMI_JOIN
|| joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN)) {
for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
if (!leftSlotDescriptor.isMaterialized()) {
continue;
@ -903,7 +905,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.collect(Collectors.toList());
if (nestedLoopJoinNode.getConjuncts().isEmpty()
&& (joinType == JoinType.LEFT_ANTI_JOIN || joinType == JoinType.LEFT_SEMI_JOIN)) {
&& (joinType == JoinType.LEFT_ANTI_JOIN
|| joinType == JoinType.LEFT_SEMI_JOIN
|| joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN)) {
for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
if (!leftSlotDescriptor.isMaterialized()) {
continue;

View File

@ -55,7 +55,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
private static final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
JoinType.LEFT_ANTI_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_OUTER_JOIN
JoinType.LEFT_OUTER_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN
);
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();

View File

@ -68,11 +68,18 @@ public class JoinCommute extends OneExplorationRuleFactory {
LEFT_DEEP, ZIG_ZAG, BUSHY
}
/**
* Check if commutative law needs to be enforced.
*/
public static boolean check(SwapType swapType, LogicalJoin<GroupPlan, GroupPlan> join) {
if (swapType == SwapType.LEFT_DEEP && isNotBottomJoin(join)) {
return false;
}
if (join.getJoinType().isNullAwareLeftAntiJoin()) {
return false;
}
return !join.getJoinReorderContext().hasCommute() && !join.getJoinReorderContext().hasExchange();
}

View File

@ -55,7 +55,8 @@ public class SemiJoinLogicalJoinTranspose extends OneExplorationRuleFactory {
public Rule build() {
return logicalJoin(logicalJoin(), group())
.when(topJoin -> topJoin.getJoinType() == JoinType.LEFT_SEMI_JOIN
|| topJoin.getJoinType() == JoinType.LEFT_ANTI_JOIN)
|| topJoin.getJoinType() == JoinType.LEFT_ANTI_JOIN
|| topJoin.getJoinType() == JoinType.NULL_AWARE_LEFT_ANTI_JOIN)
.whenNot(topJoin -> topJoin.left().getJoinType().isSemiOrAntiJoin())
.when(this::conditionChecker)
.whenNot(topJoin -> topJoin.hasJoinHint() || topJoin.left().hasJoinHint())

View File

@ -57,7 +57,8 @@ public class SemiJoinLogicalJoinTransposeProject extends OneExplorationRuleFacto
public Rule build() {
return logicalJoin(logicalProject(logicalJoin()), group())
.when(topJoin -> topJoin.getJoinType() == JoinType.LEFT_SEMI_JOIN
|| topJoin.getJoinType() == JoinType.LEFT_ANTI_JOIN)
|| topJoin.getJoinType() == JoinType.LEFT_ANTI_JOIN
|| topJoin.getJoinType() == JoinType.NULL_AWARE_LEFT_ANTI_JOIN)
.whenNot(topJoin -> topJoin.left().child().getJoinType().isSemiOrAntiJoin())
.whenNot(join -> join.hasJoinHint() || join.left().child().hasJoinHint())
.when(this::conditionChecker)

View File

@ -44,7 +44,12 @@ public class SemiJoinSemiJoinTranspose extends OneExplorationRuleFactory {
Pair.of(JoinType.LEFT_SEMI_JOIN, JoinType.LEFT_SEMI_JOIN),
Pair.of(JoinType.LEFT_ANTI_JOIN, JoinType.LEFT_ANTI_JOIN),
Pair.of(JoinType.LEFT_SEMI_JOIN, JoinType.LEFT_ANTI_JOIN),
Pair.of(JoinType.LEFT_ANTI_JOIN, JoinType.LEFT_SEMI_JOIN));
Pair.of(JoinType.LEFT_ANTI_JOIN, JoinType.LEFT_SEMI_JOIN),
Pair.of(JoinType.NULL_AWARE_LEFT_ANTI_JOIN, JoinType.NULL_AWARE_LEFT_ANTI_JOIN),
Pair.of(JoinType.NULL_AWARE_LEFT_ANTI_JOIN, JoinType.LEFT_SEMI_JOIN),
Pair.of(JoinType.NULL_AWARE_LEFT_ANTI_JOIN, JoinType.LEFT_ANTI_JOIN),
Pair.of(JoinType.LEFT_SEMI_JOIN, JoinType.NULL_AWARE_LEFT_ANTI_JOIN),
Pair.of(JoinType.LEFT_ANTI_JOIN, JoinType.NULL_AWARE_LEFT_ANTI_JOIN));
/*
* topJoin newTopJoin

View File

@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
/**
* Convert InApply to LogicalJoin.
* <p>
* Not In -> LEFT_ANTI_JOIN
* Not In -> NULL_AWARE_LEFT_ANTI_JOIN
* In -> LEFT_SEMI_JOIN
*/
public class InApplyToJoin extends OneRewriteRuleFactory {
@ -53,7 +53,7 @@ public class InApplyToJoin extends OneRewriteRuleFactory {
}
if (((InSubquery) apply.getSubqueryExpr()).isNot()) {
return new LogicalJoin<>(JoinType.LEFT_ANTI_JOIN, Lists.newArrayList(),
return new LogicalJoin<>(JoinType.NULL_AWARE_LEFT_ANTI_JOIN, Lists.newArrayList(),
ExpressionUtils.extractConjunction(predicate),
JoinHint.NONE,
apply.left(), apply.right());

View File

@ -69,6 +69,7 @@ public class InferPredicates extends DefaultPlanRewriter<JobContext> {
break;
case LEFT_OUTER_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
otherJoinConjuncts.addAll(inferNewPredicate(right, expressions));
break;
case RIGHT_OUTER_JOIN:

View File

@ -91,6 +91,7 @@ public class PullUpPredicates extends PlanVisitor<ImmutableSet<Expression>, Void
break;
case LEFT_OUTER_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
predicates.addAll(leftPredicates);
break;
case RIGHT_OUTER_JOIN:

View File

@ -47,6 +47,7 @@ public class PushdownFilterThroughJoin extends OneRewriteRuleFactory {
JoinType.LEFT_OUTER_JOIN,
JoinType.LEFT_SEMI_JOIN,
JoinType.LEFT_ANTI_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN,
JoinType.CROSS_JOIN
);

View File

@ -50,6 +50,7 @@ public class PushdownJoinOtherCondition extends OneRewriteRuleFactory {
JoinType.INNER_JOIN,
JoinType.LEFT_OUTER_JOIN,
JoinType.LEFT_ANTI_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN,
JoinType.LEFT_SEMI_JOIN,
JoinType.RIGHT_SEMI_JOIN,
JoinType.CROSS_JOIN

View File

@ -98,7 +98,9 @@ public class JoinEstimation {
public static StatsDeriveResult estimate(StatsDeriveResult leftStats, StatsDeriveResult rightStats, Join join) {
JoinType joinType = join.getJoinType();
double rowCount = Double.MAX_VALUE;
if (joinType == JoinType.LEFT_SEMI_JOIN || joinType == JoinType.LEFT_ANTI_JOIN) {
if (joinType == JoinType.LEFT_SEMI_JOIN
|| joinType == JoinType.LEFT_ANTI_JOIN
|| joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) {
double rightCount = rightStats.getRowCount();
double leftCount = leftStats.getRowCount();
if (join.getHashJoinConjuncts().isEmpty()) {

View File

@ -146,6 +146,10 @@ public enum JoinType {
return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN && this != NULL_AWARE_LEFT_ANTI_JOIN;
}
public final boolean isNullAwareLeftAntiJoin() {
return this == NULL_AWARE_LEFT_ANTI_JOIN;
}
public final boolean isSwapJoinType() {
return joinSwapMap.containsKey(this);
}

View File

@ -174,6 +174,7 @@ public class LogicalJoin<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE extends
switch (joinType) {
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
return ImmutableList.copyOf(left().getOutput());
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:

View File

@ -51,8 +51,8 @@ import java.util.stream.Collectors;
*/
public class JoinUtils {
public static boolean couldShuffle(Join join) {
// Cross-join only can be broadcast join.
return !(join.getJoinType().isCrossJoin());
// Cross-join and Null-Aware-Left-Anti-Join only can be broadcast join.
return !(join.getJoinType().isCrossJoin()) && !(join.getJoinType().isNullAwareLeftAntiJoin());
}
public static boolean couldBroadcast(Join join) {

View File

@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
2
-- !select --
\N
2
-- !select --
-- !select --

View File

@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_nereids_null_aware_left_anti_join") {
def tableName1 = "test_null_aware_left_anti_join1"
def tableName2 = "test_null_aware_left_anti_join2"
sql """
drop table if exists ${tableName1};
"""
sql """
drop table if exists ${tableName2};
"""
sql """
create table if not exists ${tableName1} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1");
"""
sql """
create table if not exists ${tableName2} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1");
"""
sql """
insert into ${tableName1} values (1), (3);
"""
sql """
insert into ${tableName2} values (1), (2);
"""
sql "SET enable_nereids_planner=true"
sql "SET enable_vectorized_engine=true"
sql "SET enable_fallback_to_original_planner=false"
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
sql "SET enable_fallback_to_original_planner=true"
sql """
insert into ${tableName2} values(null);
"""
sql "SET enable_fallback_to_original_planner=false"
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
sql "SET enable_fallback_to_original_planner=true"
sql """
insert into ${tableName1} values(null);
"""
sql "SET enable_fallback_to_original_planner=false"
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
sql "SET enable_fallback_to_original_planner=true"
sql """ set parallel_fragment_exec_instance_num=2; """
sql "SET enable_fallback_to_original_planner=false"
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
sql "SET enable_fallback_to_original_planner=true"
sql """
drop table if exists ${tableName2};
"""
sql """
drop table if exists ${tableName1};
"""
}