From 7bdd854fdcc7136b5668a26b8ba67fc61ba6b576 Mon Sep 17 00:00:00 2001 From: starocean999 <40539150+starocean999@users.noreply.github.com> Date: Fri, 24 Mar 2023 19:21:41 +0800 Subject: [PATCH] [fix](nereids) bucket shuffle and colocate join is not correctly recognized (#17807) 1. close (https://github.com/apache/doris/issues/16458) for nereids 2. varchar and string type should be treated as same type in bucket shuffle join scenario. ``` create table shuffle_join_t1 ( a varchar(10) not null ) create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null ) ``` the bellow 2 sqls can use bucket shuffle join ``` select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a; select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b; ``` 3. PushdownExpressionsInHashCondition should consider both hash and other conjuncts 4. visitPhysicalProject should handle MarkJoinSlotReference --- .../java/org/apache/doris/catalog/Type.java | 5 + .../translator/PhysicalPlanTranslator.java | 13 ++ .../ChildrenPropertiesRegulator.java | 2 +- .../properties/DistributionSpecHash.java | 38 +++- .../LogicalOlapScanToPhysicalOlapScan.java | 4 +- .../PushdownExpressionsInHashCondition.java | 87 ++++---- .../trees/plans/logical/LogicalJoin.java | 7 + .../trees/plans/logical/LogicalOlapScan.java | 7 +- .../doris/nereids/types/StringType.java | 2 +- .../doris/nereids/types/VarcharType.java | 2 +- .../apache/doris/nereids/util/JoinUtils.java | 9 +- .../doris/nereids/util/TypeCoercionUtils.java | 8 +- .../doris/planner/DistributedPlanner.java | 6 +- .../postprocess/RuntimeFilterTest.java | 2 +- .../logical/PruneOlapScanPartitionTest.java | 2 + .../logical/PruneOlapScanTabletTest.java | 2 + ...ushdownExpressionsInHashConditionTest.java | 19 -- .../nereids/types/AbstractDataTypeTest.java | 4 +- .../test_bucket_shuffle_join.groovy | 46 +++++ .../join/bucket_shuffle_join.groovy | 58 ++++++ .../join/colocate_join_with_rollup.groovy | 191 ++++++++++++++++++ 21 files changed, 438 insertions(+), 76 deletions(-) create mode 100644 regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index c5767d6c34..8f991c92f3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -372,6 +372,11 @@ public abstract class Type { || isScalarType(PrimitiveType.STRING); } + public boolean isVarcharOrStringType() { + return isScalarType(PrimitiveType.VARCHAR) + || isScalarType(PrimitiveType.STRING); + } + public boolean isVarchar() { return isScalarType(PrimitiveType.VARCHAR); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index bdc463662b..6f5b71df9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1366,11 +1366,18 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor project, PlanTranslatorContext context) { + MarkJoinSlotReference markJoinSlot = null; if (project.child(0) instanceof PhysicalHashJoin) { ((PhysicalHashJoin) project.child(0)).setShouldTranslateOutput(false); + if (((PhysicalHashJoin) project.child(0)).getMarkJoinSlotReference().isPresent()) { + markJoinSlot = (((PhysicalHashJoin) project.child(0)).getMarkJoinSlotReference().get()); + } } if (project.child(0) instanceof PhysicalNestedLoopJoin) { ((PhysicalNestedLoopJoin) project.child(0)).setShouldTranslateOutput(false); + if (((PhysicalNestedLoopJoin) project.child(0)).getMarkJoinSlotReference().isPresent()) { + markJoinSlot = (((PhysicalNestedLoopJoin) project.child(0)).getMarkJoinSlotReference().get()); + } } if (project.child(0) instanceof PhysicalFilter) { if (project.child(0).child(0) instanceof PhysicalHashJoin) { @@ -1392,6 +1399,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor e.toSlot()) .collect(Collectors.toList()); + + if (markJoinSlot != null) { + // add mark join slot to output + slotList.add(markJoinSlot); + execExprList.add(ExpressionTranslator.translate(markJoinSlot, context)); + } // For hash join node, use vSrcToOutputSMap to describe the expression calculation, use // vIntermediateTupleDescList as input, and set vOutputTupleDesc as the final output. // TODO: HashJoinNode's be implementation is not support projection yet, remove this after when supported. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index abfc0f440d..b2343bb501 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -172,7 +172,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { rightShuffleIds.add(rightRequireSpec.getOrderedShuffledColumns().get(index)); } return new PhysicalProperties(new DistributionSpecHash(rightShuffleIds, ShuffleType.ENFORCED, - rightHashSpec.getTableId(), rightHashSpec.getPartitionIds())); + rightHashSpec.getTableId(), rightHashSpec.getSelectedIndexId(), rightHashSpec.getPartitionIds())); } private double updateChildEnforceAndCost(GroupExpression child, PhysicalProperties childOutput, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 72bf7e34df..6a7f899c2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -51,6 +51,8 @@ public class DistributionSpecHash extends DistributionSpec { private final Set partitionIds; + private final long selectedIndexId; + // use for satisfied judge private final List> equivalenceExprIds; @@ -79,14 +81,23 @@ public class DistributionSpecHash extends DistributionSpec { } /** - * Normal constructor. + * Used in ut */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, Set partitionIds) { + this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds); + } + + /** + * Normal constructor. + */ + public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, + long tableId, long selectedIndexId, Set partitionIds) { this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns); this.shuffleType = Objects.requireNonNull(shuffleType); this.partitionIds = Objects.requireNonNull(partitionIds); this.tableId = tableId; + this.selectedIndexId = selectedIndexId; equivalenceExprIds = Lists.newArrayListWithCapacity(orderedShuffledColumns.size()); exprIdToEquivalenceSet = Maps.newHashMapWithExpectedSize(orderedShuffledColumns.size()); int i = 0; @@ -96,15 +107,26 @@ public class DistributionSpecHash extends DistributionSpec { } } + /** + * Used in ut + */ + public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, + long tableId, Set partitionIds, List> equivalenceExprIds, + Map exprIdToEquivalenceSet) { + this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, equivalenceExprIds, + exprIdToEquivalenceSet); + } + /** * Used in merge outside and put result into it. */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, - Set partitionIds, List> equivalenceExprIds, + long selectedIndexId, Set partitionIds, List> equivalenceExprIds, Map exprIdToEquivalenceSet) { this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns); this.shuffleType = Objects.requireNonNull(shuffleType); this.tableId = tableId; + this.selectedIndexId = selectedIndexId; this.partitionIds = Objects.requireNonNull(partitionIds); this.equivalenceExprIds = Objects.requireNonNull(equivalenceExprIds); this.exprIdToEquivalenceSet = Objects.requireNonNull(exprIdToEquivalenceSet); @@ -124,7 +146,8 @@ public class DistributionSpecHash extends DistributionSpec { exprIdToEquivalenceSet.putAll(left.getExprIdToEquivalenceSet()); exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet()); return new DistributionSpecHash(orderedShuffledColumns, shuffleType, - left.getTableId(), left.getPartitionIds(), equivalenceExprIds, exprIdToEquivalenceSet); + left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds, + exprIdToEquivalenceSet); } static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right) { @@ -143,6 +166,10 @@ public class DistributionSpecHash extends DistributionSpec { return tableId; } + public long getSelectedIndexId() { + return selectedIndexId; + } + public Set getPartitionIds() { return partitionIds; } @@ -219,7 +246,7 @@ public class DistributionSpecHash extends DistributionSpec { } public DistributionSpecHash withShuffleType(ShuffleType shuffleType) { - return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, partitionIds, + return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, equivalenceExprIds, exprIdToEquivalenceSet); } @@ -256,7 +283,7 @@ public class DistributionSpecHash extends DistributionSpec { exprIdToEquivalenceSet.put(exprIdSetKV.getKey(), exprIdSetKV.getValue()); } } - return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, partitionIds, + return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, equivalenceExprIds, exprIdToEquivalenceSet); } @@ -280,6 +307,7 @@ public class DistributionSpecHash extends DistributionSpec { "orderedShuffledColumns", orderedShuffledColumns, "shuffleType", shuffleType, "tableId", tableId, + "selectedIndexId", selectedIndexId, "partitionIds", partitionIds, "equivalenceExprIds", equivalenceExprIds, "exprIdToEquivalenceSet", exprIdToEquivalenceSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java index 6ecddb3bda..ea3e4c0530 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java @@ -89,8 +89,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact } } // TODO: need to consider colocate and dynamic partition and partition number - return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, - olapScan.getTable().getId(), Sets.newHashSet(olapScan.getTable().getPartitionIds())); + return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, olapScan.getTable().getId(), + olapScan.getSelectedIndexId(), Sets.newHashSet(olapScan.getSelectedPartitionIds())); } else { // RandomDistributionInfo return DistributionSpecAny.INSTANCE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java index bc92d8f37c..404b1dc22f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java @@ -22,6 +22,7 @@ import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -29,20 +30,16 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; -import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; /** * push down expression which is not slot reference @@ -67,52 +64,66 @@ public class PushdownExpressionsInHashCondition extends OneRewriteRuleFactory { public Rule build() { return logicalJoin() .when(join -> join.getHashJoinConjuncts().stream().anyMatch(equalTo -> - equalTo.children().stream().anyMatch(e -> !ExpressionUtils.checkTypeSkipCast(e, Slot.class)))) + equalTo.children().stream().anyMatch(e -> !(e instanceof Slot)))) .then(join -> { - List> exprsOfHashConjuncts = - Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList()); - Map exprMap = Maps.newHashMap(); + Set leftProjectExprs = Sets.newHashSet(); + Set rightProjectExprs = Sets.newHashSet(); + Map exprReplaceMap = Maps.newHashMap(); join.getHashJoinConjuncts().forEach(conjunct -> { Preconditions.checkArgument(conjunct instanceof EqualTo); // sometimes: t1 join t2 on t2.a + 1 = t1.a + 2, so check the situation, but actually it // doesn't swap the two sides. conjunct = JoinUtils.swapEqualToForChildrenOrder( (EqualTo) conjunct, join.left().getOutputSet()); - exprsOfHashConjuncts.get(0).add(conjunct.child(0)); - exprsOfHashConjuncts.get(1).add(conjunct.child(1)); - conjunct.children().forEach(expr -> { - if ((expr instanceof SlotReference)) { - exprMap.put(expr, (SlotReference) expr); - } else { - exprMap.put(expr, new Alias(expr, "expr_" + expr.toSql())); - } - }); + generateReplaceMapAndProjectExprs(conjunct.child(0), exprReplaceMap, leftProjectExprs); + generateReplaceMapAndProjectExprs(conjunct.child(1), exprReplaceMap, rightProjectExprs); }); - Iterator> iter = exprsOfHashConjuncts.iterator(); + + // add other conjuncts used slots to project exprs + Set leftExprIdSet = join.left().getOutputExprIdSet(); + join.getOtherJoinConjuncts().stream().flatMap(conjunct -> + conjunct.getInputSlots().stream() + ).forEach(slot -> { + if (leftExprIdSet.contains(slot.getExprId())) { + // belong to left child + leftProjectExprs.add(slot); + } else { + // belong to right child + rightProjectExprs.add(slot); + } + }); + + List newHashConjuncts = join.getHashJoinConjuncts().stream() + .map(equalTo -> equalTo.withChildren(equalTo.children() + .stream().map(expr -> exprReplaceMap.get(expr).toSlot()) + .collect(ImmutableList.toImmutableList()))) + .collect(ImmutableList.toImmutableList()); return join.withHashJoinConjunctsAndChildren( - join.getHashJoinConjuncts().stream() - .map(equalTo -> equalTo.withChildren(equalTo.children() - .stream().map(expr -> exprMap.get(expr).toSlot()) - .collect(ImmutableList.toImmutableList()))) - .collect(ImmutableList.toImmutableList()), - join.children().stream().map( - plan -> { - Set projectSet = Sets.newHashSet(); - projectSet.addAll(iter.next().stream().map(exprMap::get) - .collect(Collectors.toList())); - projectSet.addAll(getOutput(plan, join)); - List projectList = projectSet.stream() - .collect(ImmutableList.toImmutableList()); - return new LogicalProject<>(projectList, plan); - } - ) - .collect(ImmutableList.toImmutableList())); + newHashConjuncts, + createChildProjectPlan(join.left(), join, leftProjectExprs), + createChildProjectPlan(join.right(), join, rightProjectExprs)); }).toRule(RuleType.PUSHDOWN_EXPRESSIONS_IN_HASH_CONDITIONS); } - private List getOutput(Plan plan, LogicalJoin join) { - Set intersectionSlots = Sets.newHashSet(plan.getOutputSet()); + private LogicalProject createChildProjectPlan(Plan plan, LogicalJoin join, + Set conditionUsedExprs) { + Set intersectionSlots = Sets.newHashSet(plan.getOutputSet()); intersectionSlots.retainAll(join.getOutputSet()); - return Lists.newArrayList(intersectionSlots); + intersectionSlots.addAll(conditionUsedExprs); + return new LogicalProject(intersectionSlots.stream() + .collect(ImmutableList.toImmutableList()), plan); + } + + private void generateReplaceMapAndProjectExprs(Expression expr, Map replaceMap, + Set projects) { + if (expr instanceof SlotReference) { + projects.add((SlotReference) expr); + replaceMap.put(expr, (SlotReference) expr); + } else { + Alias alias = new Alias(expr, "expr_" + expr.toSql()); + if (replaceMap.putIfAbsent(expr, alias.toSlot()) == null) { + projects.add(alias); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java index 1317e79a9d..319d025e6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java @@ -285,6 +285,13 @@ public class LogicalJoin withHashJoinConjunctsAndChildren( + List hashJoinConjuncts, Plan left, Plan right) { + Preconditions.checkArgument(children.size() == 2); + return new LogicalJoin<>(joinType, hashJoinConjuncts, otherJoinConjuncts, hint, + markJoinSlotReference, left, right, joinReorderContext); + } + public LogicalJoin withConjunctsChildren(List hashJoinConjuncts, List otherJoinConjuncts, Plan left, Plan right) { return new LogicalJoin<>(joinType, hashJoinConjuncts, otherJoinConjuncts, hint, markJoinSlotReference, left, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index 79a128cd8f..ed7fa920e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -143,14 +144,16 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation, super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier, groupExpression, logicalProperties); + Preconditions.checkArgument(selectedPartitionIds != null, "selectedPartitionIds can not be null"); this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds); this.partitionPruned = partitionPruned; this.selectedIndexId = selectedIndexId <= 0 ? getTable().getBaseIndexId() : selectedIndexId; this.indexSelected = indexSelected; this.preAggStatus = preAggStatus; this.manuallySpecifiedPartitions = ImmutableList.copyOf(partitions); - this.selectedPartitionIds = ImmutableList.copyOf( - Objects.requireNonNull(selectedPartitionIds, "selectedPartitionIds can not be null")); + this.selectedPartitionIds = selectedPartitionIds.stream() + .filter(partitionId -> this.getTable().getPartition(partitionId).hasData()).collect( + Collectors.toList()); this.hints = Objects.requireNonNull(hints, "hints can not be null"); this.mvNameToSlot = Objects.requireNonNull(mvNameToSlot, "mvNameToSlot can not be null"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java index 4cbc869191..8658278e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java @@ -44,7 +44,7 @@ public class StringType extends CharacterType { @Override public boolean acceptsType(AbstractDataType other) { - return other instanceof StringType; + return other instanceof StringType || other instanceof VarcharType; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java index f7a7478de9..305daff8bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java @@ -54,7 +54,7 @@ public class VarcharType extends CharacterType { @Override public boolean acceptsType(AbstractDataType other) { - return other instanceof VarcharType; + return other instanceof VarcharType || other instanceof StringType; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index 662c7838e1..ea7b435c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -317,8 +317,13 @@ public class JoinUtils { final long rightTableId = rightHashSpec.getTableId(); final Set leftTablePartitions = leftHashSpec.getPartitionIds(); final Set rightTablePartitions = rightHashSpec.getPartitionIds(); - boolean noNeedCheckColocateGroup = (leftTableId == rightTableId) - && (leftTablePartitions.equals(rightTablePartitions)) && (leftTablePartitions.size() <= 1); + + // For UT or no partition is selected, getSelectedIndexId() == -1, see selectMaterializedView() + boolean hitSameIndex = (leftTableId == rightTableId) + && (leftHashSpec.getSelectedIndexId() != -1 && rightHashSpec.getSelectedIndexId() != -1) + && (leftHashSpec.getSelectedIndexId() == rightHashSpec.getSelectedIndexId()); + boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions)) + && (leftTablePartitions.size() <= 1); ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); if (noNeedCheckColocateGroup || (colocateIndex.isSameGroup(leftTableId, rightTableId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java index ea443bc610..d499fe87f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java @@ -227,7 +227,9 @@ public class TypeCoercionUtils { * cast input type if input's datatype is not same with dateType. */ public static Expression castIfNotSameType(Expression input, DataType targetType) { - if (input.getDataType().equals(targetType) || isSubqueryAndDataTypeIsBitmap(input)) { + if (input.getDataType().equals(targetType) || isSubqueryAndDataTypeIsBitmap(input) + || (isVarCharOrStringType(input.getDataType()) + && isVarCharOrStringType(targetType))) { return input; } else { checkCanCastTo(input.getDataType(), targetType); @@ -239,6 +241,10 @@ public class TypeCoercionUtils { return input instanceof SubqueryExpr && input.getDataType().isBitmapType(); } + private static boolean isVarCharOrStringType(DataType dataType) { + return dataType instanceof VarcharType || dataType instanceof StringType; + } + private static boolean canCastTo(DataType input, DataType target) { return Type.canCastTo(input.toCatalogDataType(), target.toCatalogDataType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 5999f33406..203146164a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -688,7 +688,11 @@ public class DistributedPlanner { // check the rhs join expr type is same as distribute column for (int j = 0; j < leftJoinColumnNames.size(); j++) { if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { - if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType() + .equals(leftDistributeColumns.get(i).getType()))) { rhsJoinExprs.add(rightExprs.get(j)); findRhsExprs = true; break; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index 99e9f3842f..a551fe8ba0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -229,7 +229,7 @@ public class RuntimeFilterTest extends SSBTestBase { List filters = getRuntimeFilters(sql).get(); Assertions.assertEquals(1, filters.size()); checkRuntimeFilterExprs(filters, ImmutableList.of( - Pair.of("cast(s_name as VARCHAR(*))", "p_name"))); + Pair.of("s_name", "p_name"))); } private Optional> getRuntimeFilters(String sql) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java index da971b0a45..a87de475b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.rules.rewrite.logical; +import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; @@ -130,6 +131,7 @@ class PruneOlapScanPartitionTest extends TestWithFeService implements MemoPatter notNullSingleColumnPartitionTable, multipleColumnsPartitionTable, notNullMultipleColumnsPartitionTable); + FeConstants.runningUnitTest = true; } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java index 50aff4011c..68351e6a9f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java @@ -113,6 +113,8 @@ class PruneOlapScanTabletTest implements MemoPatternMatchSupported { result = "t1"; olapTable.getPartition(anyLong); result = partition; + partition.hasData(); + result = true; partition.getIndex(anyLong); result = index; partition.getDistributionInfo(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java index 61c4365db1..6f6adc7050 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java @@ -20,7 +20,6 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.PlanChecker; @@ -30,7 +29,6 @@ import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; import java.util.List; -import java.util.Set; public class PushdownExpressionsInHashConditionTest extends TestWithFeService implements MemoPatternMatchSupported { @Override @@ -182,21 +180,4 @@ public class PushdownExpressionsInHashConditionTest extends TestWithFeService im ) ); } - - @Test - public void testNotPushDownWhenCast() { - PlanChecker.from(connectContext) - .analyze("SELECT * FROM T1 JOIN T2 ON T1.SCORE_INT = T2.SCORE") - .applyTopDown(new FindHashConditionForJoin()) - .applyTopDown(new PushdownExpressionsInHashCondition()) - .matchesFromRoot( - logicalProject( - logicalJoin( - logicalOlapScan(), - logicalOlapScan() - ).when(join -> !join.getHashJoinConjuncts().get(0) - .collect(Cast.class::isInstance).isEmpty()) - ) - ); - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java index b852bf5c9b..776b80ec47 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java @@ -375,7 +375,7 @@ public class AbstractDataTypeTest { Assertions.assertFalse(dataType.acceptsType(new DecimalV2Type(precision, scale))); Assertions.assertFalse(dataType.acceptsType(new CharType(new Random().nextInt()))); Assertions.assertTrue(dataType.acceptsType(new VarcharType(new Random().nextInt()))); - Assertions.assertFalse(dataType.acceptsType(StringType.INSTANCE)); + Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE)); Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE)); Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE)); } @@ -396,7 +396,7 @@ public class AbstractDataTypeTest { int scale = Math.min(precision, Math.abs(new Random().nextInt() % DecimalV2Type.MAX_SCALE)); Assertions.assertFalse(dataType.acceptsType(new DecimalV2Type(precision, scale))); Assertions.assertFalse(dataType.acceptsType(new CharType(new Random().nextInt()))); - Assertions.assertFalse(dataType.acceptsType(new VarcharType(new Random().nextInt()))); + Assertions.assertTrue(dataType.acceptsType(new VarcharType(new Random().nextInt()))); Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE)); Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE)); Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE)); diff --git a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy index 429d4f18ad..3c172d82e9 100644 --- a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy +++ b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy @@ -91,4 +91,50 @@ suite("test_bucket_shuffle_join") { contains "4:VHASH JOIN\n | join op: INNER JOIN(BUCKET_SHUFFLE)" contains "2:VHASH JOIN\n | join op: INNER JOIN(BUCKET_SHUFFLE)" } + + sql """ DROP TABLE IF EXISTS shuffle_join_t1 """ + sql """ DROP TABLE IF EXISTS shuffle_join_t2 """ + + sql """ + create table shuffle_join_t1 ( a varchar(10) not null ) + ENGINE=OLAP + DISTRIBUTED BY HASH(a) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """ + create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null ) + ENGINE=OLAP + DISTRIBUTED BY HASH(a) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """insert into shuffle_join_t1 values("1");""" + sql """insert into shuffle_join_t2 values("1","1","1");""" + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a;") + contains "BUCKET_SHUFFLE" + } + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b;") + contains "BUCKET_SHUFFLE" + } + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;") + notContains "BUCKET_SHUFFLE" + } + + sql """ DROP TABLE IF EXISTS shuffle_join_t1 """ + sql """ DROP TABLE IF EXISTS shuffle_join_t2 """ } diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy index 62408acd8f..5a48d2a99f 100644 --- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy +++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy @@ -21,4 +21,62 @@ suite("bucket-shuffle-join") { order_qt_test_bucket """ select * from test_bucket_shuffle_join where rectime="2021-12-01 00:00:00" and id in (select k1 from test_join where k1 in (1,2)) """ + + sql """ DROP TABLE IF EXISTS shuffle_join_t1 """ + sql """ DROP TABLE IF EXISTS shuffle_join_t2 """ + + sql """ + create table shuffle_join_t1 ( a varchar(10) not null ) + ENGINE=OLAP + DISTRIBUTED BY HASH(a) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """ + create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null ) + ENGINE=OLAP + DISTRIBUTED BY HASH(a) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """insert into shuffle_join_t1 values("1");""" + sql """insert into shuffle_join_t1 values("1");""" + sql """insert into shuffle_join_t1 values("1");""" + sql """insert into shuffle_join_t1 values("1");""" + sql """insert into shuffle_join_t2 values("1","1","1");""" + sql """insert into shuffle_join_t2 values("1","1","1");""" + sql """insert into shuffle_join_t2 values("1","1","1");""" + sql """insert into shuffle_join_t2 values("1","1","1");""" + + sql """analyze table shuffle_join_t1;""" + sql """analyze table shuffle_join_t2;""" + + Thread.sleep(2000) + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a;") + contains "BUCKET_SHUFFLE" + } + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b;") + contains "BUCKET_SHUFFLE" + } + + explain { + sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;") + contains "BUCKET_SHUFFLE" + contains "BUCKET_SHFFULE_HASH_PARTITIONED: expr_cast(c as VARCHAR(*))" + } + + sql """ DROP TABLE IF EXISTS shuffle_join_t1 """ + sql """ DROP TABLE IF EXISTS shuffle_join_t2 """ } diff --git a/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy b/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy new file mode 100644 index 0000000000..73ffd09b4f --- /dev/null +++ b/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("colocate_join_with_rollup", "nereids_p0") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + sql """ DROP TABLE IF EXISTS test_query_colocate1 """ + sql """ DROP TABLE IF EXISTS test_query_colocate2 """ + sql """ DROP TABLE IF EXISTS test_query_no_colocate """ + + sql """ + CREATE TABLE `test_query_colocate1` ( + `datekey` int(11) NULL, + `rollup_1_condition` int null, + `rollup_2_condition` int null, + `sum_col1` bigint(20) SUM NULL, + `sum_col2` bigint(20) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`) + COMMENT "" + PARTITION BY RANGE(`datekey`) + (PARTITION p20220102 VALUES [("20220101"), ("20220102")), + PARTITION p20220103 VALUES [("20220102"), ("20220103"))) + DISTRIBUTED BY HASH(`datekey`) BUCKETS 1 + rollup ( + rollup_1(datekey, sum_col1), + rollup_2(datekey, sum_col2) + ) + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "colocate_with" = "group1" + ); + """ + + sql """ + CREATE TABLE `test_query_colocate2` ( + `datekey` int(11) NULL, + `rollup_1_condition` int null, + `rollup_2_condition` int null, + `sum_col1` bigint(20) SUM NULL, + `sum_col2` bigint(20) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`) + COMMENT "" + PARTITION BY RANGE(`datekey`) + (PARTITION p20220102 VALUES [("20220101"), ("20220102")), + PARTITION p20220103 VALUES [("20220102"), ("20220103"))) + DISTRIBUTED BY HASH(`datekey`) BUCKETS 1 + rollup ( + rollup_1(datekey, sum_col1), + rollup_2(datekey, sum_col2) + ) + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "colocate_with" = "group1" + ); + """ + + sql """ + CREATE TABLE `test_query_no_colocate` ( + `datekey` int(11) NULL, + `rollup_1_condition` int null, + `rollup_2_condition` int null, + `sum_col1` bigint(20) SUM NULL, + `sum_col2` bigint(20) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`) + COMMENT "" + PARTITION BY RANGE(`datekey`) + (PARTITION p20220102 VALUES [("20220101"), ("20220110")), + PARTITION p20220103 VALUES [("20220110"), ("20220120"))) + DISTRIBUTED BY HASH(`datekey`) BUCKETS 5 + rollup ( + rollup_1(datekey, sum_col1), + rollup_2(datekey, sum_col2) + ) + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """insert into test_query_colocate1 values + (20220101, 102, 200, 200, 100), + (20220101, 101, 200, 200, 100), + (20220101, 102, 202, 200, 100), + (20220101, 101, 202, 200, 100);""" + + sql """insert into test_query_colocate2 values + (20220101, 102, 200, 200, 100), + (20220101, 101, 200, 200, 100), + (20220101, 102, 202, 200, 100), + (20220101, 101, 202, 200, 100);""" + + sql """insert into test_query_no_colocate values + (20220101, 102, 200, 200, 100), + (20220102, 101, 200, 200, 100), + (20220103, 102, 202, 200, 100), + (20220104, 101, 202, 200, 100), + (20220105, 102, 200, 200, 100), + (20220106, 101, 200, 200, 100), + (20220107, 102, 202, 200, 100), + (20220108, 101, 202, 200, 100);""" + + explain { + sql("""select sum_col1,sum_col2 + from + (select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1 + left join + (select datekey,sum(sum_col2) as sum_col2 from test_query_colocate1 where datekey=20220101 group by datekey) t2 + on t1.datekey = t2.datekey""") + contains "COLOCATE" + } + + explain { + sql("""select sum_col1,sum_col2 + from + (select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1 + left join + (select datekey,sum(sum_col1) as sum_col2 from test_query_colocate2 where datekey=20220101 group by datekey) t2 + on t1.datekey = t2.datekey""") + contains "COLOCATE" + } + + explain { + sql("""select sum_col1,sum_col2 + from + (select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1 + left join + (select datekey,sum(sum_col2) as sum_col2 from test_query_colocate2 where datekey=20220101 group by datekey) t2 + on t1.datekey = t2.datekey""") + contains "COLOCATE" + } + + explain { + // hit same rollup, colocate + sql("""select sum_col1,sum_col2 + from + (select datekey,sum(sum_col1) as sum_col1 from test_query_no_colocate group by datekey) t1 + left join + (select datekey,sum(sum_col1) as sum_col2 from test_query_no_colocate group by datekey) t2 + on t1.datekey = t2.datekey""") + contains "COLOCATE" + } + + explain { + // hit same base table, colocate + sql("""select * + from + (select datekey from test_query_no_colocate ) t1 + left join + (select datekey from test_query_no_colocate ) t2 + on t1.datekey = t2.datekey""") + contains "COLOCATE" + } + + explain { + // hit diffrent rollup, no colocate + sql("""select sum_col1,sum_col2 + from + (select datekey,sum(sum_col1) as sum_col1 from test_query_no_colocate group by datekey) t1 + left join + (select datekey,sum(sum_col2) as sum_col2 from test_query_no_colocate group by datekey) t2 + on t1.datekey = t2.datekey""") + notContains "COLOCATE" + } + + sql """ DROP TABLE IF EXISTS test_query_colocate1 """ + sql """ DROP TABLE IF EXISTS test_query_colocate2 """ + sql """ DROP TABLE IF EXISTS test_query_no_colocate """ +}