[fix](nereids) bucket shuffle and colocate join is not correctly recognized (#17807)
1. close (https://github.com/apache/doris/issues/16458) for nereids 2. varchar and string type should be treated as same type in bucket shuffle join scenario. ``` create table shuffle_join_t1 ( a varchar(10) not null ) create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null ) ``` the bellow 2 sqls can use bucket shuffle join ``` select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a; select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b; ``` 3. PushdownExpressionsInHashCondition should consider both hash and other conjuncts 4. visitPhysicalProject should handle MarkJoinSlotReference
This commit is contained in:
@ -372,6 +372,11 @@ public abstract class Type {
|
||||
|| isScalarType(PrimitiveType.STRING);
|
||||
}
|
||||
|
||||
public boolean isVarcharOrStringType() {
|
||||
return isScalarType(PrimitiveType.VARCHAR)
|
||||
|| isScalarType(PrimitiveType.STRING);
|
||||
}
|
||||
|
||||
public boolean isVarchar() {
|
||||
return isScalarType(PrimitiveType.VARCHAR);
|
||||
}
|
||||
|
||||
@ -1366,11 +1366,18 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
// TODO: generate expression mapping when be project could do in ExecNode.
|
||||
@Override
|
||||
public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanTranslatorContext context) {
|
||||
MarkJoinSlotReference markJoinSlot = null;
|
||||
if (project.child(0) instanceof PhysicalHashJoin) {
|
||||
((PhysicalHashJoin<?, ?>) project.child(0)).setShouldTranslateOutput(false);
|
||||
if (((PhysicalHashJoin<?, ?>) project.child(0)).getMarkJoinSlotReference().isPresent()) {
|
||||
markJoinSlot = (((PhysicalHashJoin<?, ?>) project.child(0)).getMarkJoinSlotReference().get());
|
||||
}
|
||||
}
|
||||
if (project.child(0) instanceof PhysicalNestedLoopJoin) {
|
||||
((PhysicalNestedLoopJoin<?, ?>) project.child(0)).setShouldTranslateOutput(false);
|
||||
if (((PhysicalNestedLoopJoin<?, ?>) project.child(0)).getMarkJoinSlotReference().isPresent()) {
|
||||
markJoinSlot = (((PhysicalNestedLoopJoin<?, ?>) project.child(0)).getMarkJoinSlotReference().get());
|
||||
}
|
||||
}
|
||||
if (project.child(0) instanceof PhysicalFilter) {
|
||||
if (project.child(0).child(0) instanceof PhysicalHashJoin) {
|
||||
@ -1392,6 +1399,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
.stream()
|
||||
.map(e -> e.toSlot())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (markJoinSlot != null) {
|
||||
// add mark join slot to output
|
||||
slotList.add(markJoinSlot);
|
||||
execExprList.add(ExpressionTranslator.translate(markJoinSlot, context));
|
||||
}
|
||||
// For hash join node, use vSrcToOutputSMap to describe the expression calculation, use
|
||||
// vIntermediateTupleDescList as input, and set vOutputTupleDesc as the final output.
|
||||
// TODO: HashJoinNode's be implementation is not support projection yet, remove this after when supported.
|
||||
|
||||
@ -172,7 +172,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Double, Void> {
|
||||
rightShuffleIds.add(rightRequireSpec.getOrderedShuffledColumns().get(index));
|
||||
}
|
||||
return new PhysicalProperties(new DistributionSpecHash(rightShuffleIds, ShuffleType.ENFORCED,
|
||||
rightHashSpec.getTableId(), rightHashSpec.getPartitionIds()));
|
||||
rightHashSpec.getTableId(), rightHashSpec.getSelectedIndexId(), rightHashSpec.getPartitionIds()));
|
||||
}
|
||||
|
||||
private double updateChildEnforceAndCost(GroupExpression child, PhysicalProperties childOutput,
|
||||
|
||||
@ -51,6 +51,8 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
|
||||
private final Set<Long> partitionIds;
|
||||
|
||||
private final long selectedIndexId;
|
||||
|
||||
// use for satisfied judge
|
||||
private final List<Set<ExprId>> equivalenceExprIds;
|
||||
|
||||
@ -79,14 +81,23 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
}
|
||||
|
||||
/**
|
||||
* Normal constructor.
|
||||
* Used in ut
|
||||
*/
|
||||
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType,
|
||||
long tableId, Set<Long> partitionIds) {
|
||||
this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Normal constructor.
|
||||
*/
|
||||
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType,
|
||||
long tableId, long selectedIndexId, Set<Long> partitionIds) {
|
||||
this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns);
|
||||
this.shuffleType = Objects.requireNonNull(shuffleType);
|
||||
this.partitionIds = Objects.requireNonNull(partitionIds);
|
||||
this.tableId = tableId;
|
||||
this.selectedIndexId = selectedIndexId;
|
||||
equivalenceExprIds = Lists.newArrayListWithCapacity(orderedShuffledColumns.size());
|
||||
exprIdToEquivalenceSet = Maps.newHashMapWithExpectedSize(orderedShuffledColumns.size());
|
||||
int i = 0;
|
||||
@ -96,15 +107,26 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in ut
|
||||
*/
|
||||
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType,
|
||||
long tableId, Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
|
||||
Map<ExprId, Integer> exprIdToEquivalenceSet) {
|
||||
this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, equivalenceExprIds,
|
||||
exprIdToEquivalenceSet);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in merge outside and put result into it.
|
||||
*/
|
||||
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType, long tableId,
|
||||
Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
|
||||
long selectedIndexId, Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
|
||||
Map<ExprId, Integer> exprIdToEquivalenceSet) {
|
||||
this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns);
|
||||
this.shuffleType = Objects.requireNonNull(shuffleType);
|
||||
this.tableId = tableId;
|
||||
this.selectedIndexId = selectedIndexId;
|
||||
this.partitionIds = Objects.requireNonNull(partitionIds);
|
||||
this.equivalenceExprIds = Objects.requireNonNull(equivalenceExprIds);
|
||||
this.exprIdToEquivalenceSet = Objects.requireNonNull(exprIdToEquivalenceSet);
|
||||
@ -124,7 +146,8 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
exprIdToEquivalenceSet.putAll(left.getExprIdToEquivalenceSet());
|
||||
exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet());
|
||||
return new DistributionSpecHash(orderedShuffledColumns, shuffleType,
|
||||
left.getTableId(), left.getPartitionIds(), equivalenceExprIds, exprIdToEquivalenceSet);
|
||||
left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds,
|
||||
exprIdToEquivalenceSet);
|
||||
}
|
||||
|
||||
static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right) {
|
||||
@ -143,6 +166,10 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public long getSelectedIndexId() {
|
||||
return selectedIndexId;
|
||||
}
|
||||
|
||||
public Set<Long> getPartitionIds() {
|
||||
return partitionIds;
|
||||
}
|
||||
@ -219,7 +246,7 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
}
|
||||
|
||||
public DistributionSpecHash withShuffleType(ShuffleType shuffleType) {
|
||||
return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, partitionIds,
|
||||
return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds,
|
||||
equivalenceExprIds, exprIdToEquivalenceSet);
|
||||
}
|
||||
|
||||
@ -256,7 +283,7 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
exprIdToEquivalenceSet.put(exprIdSetKV.getKey(), exprIdSetKV.getValue());
|
||||
}
|
||||
}
|
||||
return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, partitionIds,
|
||||
return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds,
|
||||
equivalenceExprIds, exprIdToEquivalenceSet);
|
||||
}
|
||||
|
||||
@ -280,6 +307,7 @@ public class DistributionSpecHash extends DistributionSpec {
|
||||
"orderedShuffledColumns", orderedShuffledColumns,
|
||||
"shuffleType", shuffleType,
|
||||
"tableId", tableId,
|
||||
"selectedIndexId", selectedIndexId,
|
||||
"partitionIds", partitionIds,
|
||||
"equivalenceExprIds", equivalenceExprIds,
|
||||
"exprIdToEquivalenceSet", exprIdToEquivalenceSet);
|
||||
|
||||
@ -89,8 +89,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact
|
||||
}
|
||||
}
|
||||
// TODO: need to consider colocate and dynamic partition and partition number
|
||||
return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL,
|
||||
olapScan.getTable().getId(), Sets.newHashSet(olapScan.getTable().getPartitionIds()));
|
||||
return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, olapScan.getTable().getId(),
|
||||
olapScan.getSelectedIndexId(), Sets.newHashSet(olapScan.getSelectedPartitionIds()));
|
||||
} else {
|
||||
// RandomDistributionInfo
|
||||
return DistributionSpecAny.INSTANCE;
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
@ -29,20 +30,16 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.JoinUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* push down expression which is not slot reference
|
||||
@ -67,52 +64,66 @@ public class PushdownExpressionsInHashCondition extends OneRewriteRuleFactory {
|
||||
public Rule build() {
|
||||
return logicalJoin()
|
||||
.when(join -> join.getHashJoinConjuncts().stream().anyMatch(equalTo ->
|
||||
equalTo.children().stream().anyMatch(e -> !ExpressionUtils.checkTypeSkipCast(e, Slot.class))))
|
||||
equalTo.children().stream().anyMatch(e -> !(e instanceof Slot))))
|
||||
.then(join -> {
|
||||
List<List<Expression>> exprsOfHashConjuncts =
|
||||
Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList());
|
||||
Map<Expression, NamedExpression> exprMap = Maps.newHashMap();
|
||||
Set<NamedExpression> leftProjectExprs = Sets.newHashSet();
|
||||
Set<NamedExpression> rightProjectExprs = Sets.newHashSet();
|
||||
Map<Expression, NamedExpression> exprReplaceMap = Maps.newHashMap();
|
||||
join.getHashJoinConjuncts().forEach(conjunct -> {
|
||||
Preconditions.checkArgument(conjunct instanceof EqualTo);
|
||||
// sometimes: t1 join t2 on t2.a + 1 = t1.a + 2, so check the situation, but actually it
|
||||
// doesn't swap the two sides.
|
||||
conjunct = JoinUtils.swapEqualToForChildrenOrder(
|
||||
(EqualTo) conjunct, join.left().getOutputSet());
|
||||
exprsOfHashConjuncts.get(0).add(conjunct.child(0));
|
||||
exprsOfHashConjuncts.get(1).add(conjunct.child(1));
|
||||
conjunct.children().forEach(expr -> {
|
||||
if ((expr instanceof SlotReference)) {
|
||||
exprMap.put(expr, (SlotReference) expr);
|
||||
} else {
|
||||
exprMap.put(expr, new Alias(expr, "expr_" + expr.toSql()));
|
||||
}
|
||||
});
|
||||
generateReplaceMapAndProjectExprs(conjunct.child(0), exprReplaceMap, leftProjectExprs);
|
||||
generateReplaceMapAndProjectExprs(conjunct.child(1), exprReplaceMap, rightProjectExprs);
|
||||
});
|
||||
Iterator<List<Expression>> iter = exprsOfHashConjuncts.iterator();
|
||||
|
||||
// add other conjuncts used slots to project exprs
|
||||
Set<ExprId> leftExprIdSet = join.left().getOutputExprIdSet();
|
||||
join.getOtherJoinConjuncts().stream().flatMap(conjunct ->
|
||||
conjunct.getInputSlots().stream()
|
||||
).forEach(slot -> {
|
||||
if (leftExprIdSet.contains(slot.getExprId())) {
|
||||
// belong to left child
|
||||
leftProjectExprs.add(slot);
|
||||
} else {
|
||||
// belong to right child
|
||||
rightProjectExprs.add(slot);
|
||||
}
|
||||
});
|
||||
|
||||
List<Expression> newHashConjuncts = join.getHashJoinConjuncts().stream()
|
||||
.map(equalTo -> equalTo.withChildren(equalTo.children()
|
||||
.stream().map(expr -> exprReplaceMap.get(expr).toSlot())
|
||||
.collect(ImmutableList.toImmutableList())))
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
return join.withHashJoinConjunctsAndChildren(
|
||||
join.getHashJoinConjuncts().stream()
|
||||
.map(equalTo -> equalTo.withChildren(equalTo.children()
|
||||
.stream().map(expr -> exprMap.get(expr).toSlot())
|
||||
.collect(ImmutableList.toImmutableList())))
|
||||
.collect(ImmutableList.toImmutableList()),
|
||||
join.children().stream().map(
|
||||
plan -> {
|
||||
Set<NamedExpression> projectSet = Sets.newHashSet();
|
||||
projectSet.addAll(iter.next().stream().map(exprMap::get)
|
||||
.collect(Collectors.toList()));
|
||||
projectSet.addAll(getOutput(plan, join));
|
||||
List<NamedExpression> projectList = projectSet.stream()
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
return new LogicalProject<>(projectList, plan);
|
||||
}
|
||||
)
|
||||
.collect(ImmutableList.toImmutableList()));
|
||||
newHashConjuncts,
|
||||
createChildProjectPlan(join.left(), join, leftProjectExprs),
|
||||
createChildProjectPlan(join.right(), join, rightProjectExprs));
|
||||
}).toRule(RuleType.PUSHDOWN_EXPRESSIONS_IN_HASH_CONDITIONS);
|
||||
}
|
||||
|
||||
private List<Slot> getOutput(Plan plan, LogicalJoin join) {
|
||||
Set<Slot> intersectionSlots = Sets.newHashSet(plan.getOutputSet());
|
||||
private LogicalProject createChildProjectPlan(Plan plan, LogicalJoin join,
|
||||
Set<NamedExpression> conditionUsedExprs) {
|
||||
Set<NamedExpression> intersectionSlots = Sets.newHashSet(plan.getOutputSet());
|
||||
intersectionSlots.retainAll(join.getOutputSet());
|
||||
return Lists.newArrayList(intersectionSlots);
|
||||
intersectionSlots.addAll(conditionUsedExprs);
|
||||
return new LogicalProject(intersectionSlots.stream()
|
||||
.collect(ImmutableList.toImmutableList()), plan);
|
||||
}
|
||||
|
||||
private void generateReplaceMapAndProjectExprs(Expression expr, Map<Expression, NamedExpression> replaceMap,
|
||||
Set<NamedExpression> projects) {
|
||||
if (expr instanceof SlotReference) {
|
||||
projects.add((SlotReference) expr);
|
||||
replaceMap.put(expr, (SlotReference) expr);
|
||||
} else {
|
||||
Alias alias = new Alias(expr, "expr_" + expr.toSql());
|
||||
if (replaceMap.putIfAbsent(expr, alias.toSlot()) == null) {
|
||||
projects.add(alias);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -285,6 +285,13 @@ public class LogicalJoin<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE extends
|
||||
children.get(1), joinReorderContext);
|
||||
}
|
||||
|
||||
public LogicalJoin<Plan, Plan> withHashJoinConjunctsAndChildren(
|
||||
List<Expression> hashJoinConjuncts, Plan left, Plan right) {
|
||||
Preconditions.checkArgument(children.size() == 2);
|
||||
return new LogicalJoin<>(joinType, hashJoinConjuncts, otherJoinConjuncts, hint,
|
||||
markJoinSlotReference, left, right, joinReorderContext);
|
||||
}
|
||||
|
||||
public LogicalJoin<Plan, Plan> withConjunctsChildren(List<Expression> hashJoinConjuncts,
|
||||
List<Expression> otherJoinConjuncts, Plan left, Plan right) {
|
||||
return new LogicalJoin<>(joinType, hashJoinConjuncts, otherJoinConjuncts, hint, markJoinSlotReference, left,
|
||||
|
||||
@ -47,6 +47,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
@ -143,14 +144,16 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation,
|
||||
|
||||
super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
|
||||
groupExpression, logicalProperties);
|
||||
Preconditions.checkArgument(selectedPartitionIds != null, "selectedPartitionIds can not be null");
|
||||
this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds);
|
||||
this.partitionPruned = partitionPruned;
|
||||
this.selectedIndexId = selectedIndexId <= 0 ? getTable().getBaseIndexId() : selectedIndexId;
|
||||
this.indexSelected = indexSelected;
|
||||
this.preAggStatus = preAggStatus;
|
||||
this.manuallySpecifiedPartitions = ImmutableList.copyOf(partitions);
|
||||
this.selectedPartitionIds = ImmutableList.copyOf(
|
||||
Objects.requireNonNull(selectedPartitionIds, "selectedPartitionIds can not be null"));
|
||||
this.selectedPartitionIds = selectedPartitionIds.stream()
|
||||
.filter(partitionId -> this.getTable().getPartition(partitionId).hasData()).collect(
|
||||
Collectors.toList());
|
||||
this.hints = Objects.requireNonNull(hints, "hints can not be null");
|
||||
this.mvNameToSlot = Objects.requireNonNull(mvNameToSlot, "mvNameToSlot can not be null");
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ public class StringType extends CharacterType {
|
||||
|
||||
@Override
|
||||
public boolean acceptsType(AbstractDataType other) {
|
||||
return other instanceof StringType;
|
||||
return other instanceof StringType || other instanceof VarcharType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -54,7 +54,7 @@ public class VarcharType extends CharacterType {
|
||||
|
||||
@Override
|
||||
public boolean acceptsType(AbstractDataType other) {
|
||||
return other instanceof VarcharType;
|
||||
return other instanceof VarcharType || other instanceof StringType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -317,8 +317,13 @@ public class JoinUtils {
|
||||
final long rightTableId = rightHashSpec.getTableId();
|
||||
final Set<Long> leftTablePartitions = leftHashSpec.getPartitionIds();
|
||||
final Set<Long> rightTablePartitions = rightHashSpec.getPartitionIds();
|
||||
boolean noNeedCheckColocateGroup = (leftTableId == rightTableId)
|
||||
&& (leftTablePartitions.equals(rightTablePartitions)) && (leftTablePartitions.size() <= 1);
|
||||
|
||||
// For UT or no partition is selected, getSelectedIndexId() == -1, see selectMaterializedView()
|
||||
boolean hitSameIndex = (leftTableId == rightTableId)
|
||||
&& (leftHashSpec.getSelectedIndexId() != -1 && rightHashSpec.getSelectedIndexId() != -1)
|
||||
&& (leftHashSpec.getSelectedIndexId() == rightHashSpec.getSelectedIndexId());
|
||||
boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions))
|
||||
&& (leftTablePartitions.size() <= 1);
|
||||
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
|
||||
if (noNeedCheckColocateGroup
|
||||
|| (colocateIndex.isSameGroup(leftTableId, rightTableId)
|
||||
|
||||
@ -227,7 +227,9 @@ public class TypeCoercionUtils {
|
||||
* cast input type if input's datatype is not same with dateType.
|
||||
*/
|
||||
public static Expression castIfNotSameType(Expression input, DataType targetType) {
|
||||
if (input.getDataType().equals(targetType) || isSubqueryAndDataTypeIsBitmap(input)) {
|
||||
if (input.getDataType().equals(targetType) || isSubqueryAndDataTypeIsBitmap(input)
|
||||
|| (isVarCharOrStringType(input.getDataType())
|
||||
&& isVarCharOrStringType(targetType))) {
|
||||
return input;
|
||||
} else {
|
||||
checkCanCastTo(input.getDataType(), targetType);
|
||||
@ -239,6 +241,10 @@ public class TypeCoercionUtils {
|
||||
return input instanceof SubqueryExpr && input.getDataType().isBitmapType();
|
||||
}
|
||||
|
||||
private static boolean isVarCharOrStringType(DataType dataType) {
|
||||
return dataType instanceof VarcharType || dataType instanceof StringType;
|
||||
}
|
||||
|
||||
private static boolean canCastTo(DataType input, DataType target) {
|
||||
return Type.canCastTo(input.toCatalogDataType(), target.toCatalogDataType());
|
||||
}
|
||||
|
||||
@ -688,7 +688,11 @@ public class DistributedPlanner {
|
||||
// check the rhs join expr type is same as distribute column
|
||||
for (int j = 0; j < leftJoinColumnNames.size(); j++) {
|
||||
if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
|
||||
if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
|
||||
// varchar and string type don't need to check the length property
|
||||
if ((rightExprs.get(j).getType().isVarcharOrStringType()
|
||||
&& leftDistributeColumns.get(i).getType().isVarcharOrStringType())
|
||||
|| (rightExprs.get(j).getType()
|
||||
.equals(leftDistributeColumns.get(i).getType()))) {
|
||||
rhsJoinExprs.add(rightExprs.get(j));
|
||||
findRhsExprs = true;
|
||||
break;
|
||||
|
||||
@ -229,7 +229,7 @@ public class RuntimeFilterTest extends SSBTestBase {
|
||||
List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
|
||||
Assertions.assertEquals(1, filters.size());
|
||||
checkRuntimeFilterExprs(filters, ImmutableList.of(
|
||||
Pair.of("cast(s_name as VARCHAR(*))", "p_name")));
|
||||
Pair.of("s_name", "p_name")));
|
||||
}
|
||||
|
||||
private Optional<List<RuntimeFilter>> getRuntimeFilters(String sql) {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.rewrite.logical;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
@ -130,6 +131,7 @@ class PruneOlapScanPartitionTest extends TestWithFeService implements MemoPatter
|
||||
notNullSingleColumnPartitionTable,
|
||||
multipleColumnsPartitionTable,
|
||||
notNullMultipleColumnsPartitionTable);
|
||||
FeConstants.runningUnitTest = true;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -113,6 +113,8 @@ class PruneOlapScanTabletTest implements MemoPatternMatchSupported {
|
||||
result = "t1";
|
||||
olapTable.getPartition(anyLong);
|
||||
result = partition;
|
||||
partition.hasData();
|
||||
result = true;
|
||||
partition.getIndex(anyLong);
|
||||
result = index;
|
||||
partition.getDistributionInfo();
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.nereids.rules.rewrite.logical;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Cast;
|
||||
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
|
||||
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
@ -30,7 +29,6 @@ import com.google.common.collect.ImmutableList;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class PushdownExpressionsInHashConditionTest extends TestWithFeService implements MemoPatternMatchSupported {
|
||||
@Override
|
||||
@ -182,21 +180,4 @@ public class PushdownExpressionsInHashConditionTest extends TestWithFeService im
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotPushDownWhenCast() {
|
||||
PlanChecker.from(connectContext)
|
||||
.analyze("SELECT * FROM T1 JOIN T2 ON T1.SCORE_INT = T2.SCORE")
|
||||
.applyTopDown(new FindHashConditionForJoin())
|
||||
.applyTopDown(new PushdownExpressionsInHashCondition())
|
||||
.matchesFromRoot(
|
||||
logicalProject(
|
||||
logicalJoin(
|
||||
logicalOlapScan(),
|
||||
logicalOlapScan()
|
||||
).when(join -> !join.getHashJoinConjuncts().get(0)
|
||||
.<Set>collect(Cast.class::isInstance).isEmpty())
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -375,7 +375,7 @@ public class AbstractDataTypeTest {
|
||||
Assertions.assertFalse(dataType.acceptsType(new DecimalV2Type(precision, scale)));
|
||||
Assertions.assertFalse(dataType.acceptsType(new CharType(new Random().nextInt())));
|
||||
Assertions.assertTrue(dataType.acceptsType(new VarcharType(new Random().nextInt())));
|
||||
Assertions.assertFalse(dataType.acceptsType(StringType.INSTANCE));
|
||||
Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE));
|
||||
Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE));
|
||||
Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE));
|
||||
}
|
||||
@ -396,7 +396,7 @@ public class AbstractDataTypeTest {
|
||||
int scale = Math.min(precision, Math.abs(new Random().nextInt() % DecimalV2Type.MAX_SCALE));
|
||||
Assertions.assertFalse(dataType.acceptsType(new DecimalV2Type(precision, scale)));
|
||||
Assertions.assertFalse(dataType.acceptsType(new CharType(new Random().nextInt())));
|
||||
Assertions.assertFalse(dataType.acceptsType(new VarcharType(new Random().nextInt())));
|
||||
Assertions.assertTrue(dataType.acceptsType(new VarcharType(new Random().nextInt())));
|
||||
Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE));
|
||||
Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE));
|
||||
Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE));
|
||||
|
||||
@ -91,4 +91,50 @@ suite("test_bucket_shuffle_join") {
|
||||
contains "4:VHASH JOIN\n | join op: INNER JOIN(BUCKET_SHUFFLE)"
|
||||
contains "2:VHASH JOIN\n | join op: INNER JOIN(BUCKET_SHUFFLE)"
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
|
||||
|
||||
sql """
|
||||
create table shuffle_join_t1 ( a varchar(10) not null )
|
||||
ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(a) BUCKETS 5
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null )
|
||||
ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(a) BUCKETS 5
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """insert into shuffle_join_t1 values("1");"""
|
||||
sql """insert into shuffle_join_t2 values("1","1","1");"""
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a;")
|
||||
contains "BUCKET_SHUFFLE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b;")
|
||||
contains "BUCKET_SHUFFLE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;")
|
||||
notContains "BUCKET_SHUFFLE"
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
|
||||
}
|
||||
|
||||
@ -21,4 +21,62 @@ suite("bucket-shuffle-join") {
|
||||
order_qt_test_bucket """
|
||||
select * from test_bucket_shuffle_join where rectime="2021-12-01 00:00:00" and id in (select k1 from test_join where k1 in (1,2))
|
||||
"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
|
||||
|
||||
sql """
|
||||
create table shuffle_join_t1 ( a varchar(10) not null )
|
||||
ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(a) BUCKETS 5
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c char(3) not null )
|
||||
ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(a) BUCKETS 5
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """insert into shuffle_join_t1 values("1");"""
|
||||
sql """insert into shuffle_join_t1 values("1");"""
|
||||
sql """insert into shuffle_join_t1 values("1");"""
|
||||
sql """insert into shuffle_join_t1 values("1");"""
|
||||
sql """insert into shuffle_join_t2 values("1","1","1");"""
|
||||
sql """insert into shuffle_join_t2 values("1","1","1");"""
|
||||
sql """insert into shuffle_join_t2 values("1","1","1");"""
|
||||
sql """insert into shuffle_join_t2 values("1","1","1");"""
|
||||
|
||||
sql """analyze table shuffle_join_t1;"""
|
||||
sql """analyze table shuffle_join_t2;"""
|
||||
|
||||
Thread.sleep(2000)
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.a;")
|
||||
contains "BUCKET_SHUFFLE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.b;")
|
||||
contains "BUCKET_SHUFFLE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;")
|
||||
contains "BUCKET_SHUFFLE"
|
||||
contains "BUCKET_SHFFULE_HASH_PARTITIONED: expr_cast(c as VARCHAR(*))"
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
|
||||
sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
|
||||
}
|
||||
|
||||
@ -0,0 +1,191 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("colocate_join_with_rollup", "nereids_p0") {
|
||||
sql "SET enable_nereids_planner=true"
|
||||
sql "SET enable_fallback_to_original_planner=false"
|
||||
|
||||
sql """ DROP TABLE IF EXISTS test_query_colocate1 """
|
||||
sql """ DROP TABLE IF EXISTS test_query_colocate2 """
|
||||
sql """ DROP TABLE IF EXISTS test_query_no_colocate """
|
||||
|
||||
sql """
|
||||
CREATE TABLE `test_query_colocate1` (
|
||||
`datekey` int(11) NULL,
|
||||
`rollup_1_condition` int null,
|
||||
`rollup_2_condition` int null,
|
||||
`sum_col1` bigint(20) SUM NULL,
|
||||
`sum_col2` bigint(20) SUM NULL
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
|
||||
COMMENT ""
|
||||
PARTITION BY RANGE(`datekey`)
|
||||
(PARTITION p20220102 VALUES [("20220101"), ("20220102")),
|
||||
PARTITION p20220103 VALUES [("20220102"), ("20220103")))
|
||||
DISTRIBUTED BY HASH(`datekey`) BUCKETS 1
|
||||
rollup (
|
||||
rollup_1(datekey, sum_col1),
|
||||
rollup_2(datekey, sum_col2)
|
||||
)
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2",
|
||||
"colocate_with" = "group1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE TABLE `test_query_colocate2` (
|
||||
`datekey` int(11) NULL,
|
||||
`rollup_1_condition` int null,
|
||||
`rollup_2_condition` int null,
|
||||
`sum_col1` bigint(20) SUM NULL,
|
||||
`sum_col2` bigint(20) SUM NULL
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
|
||||
COMMENT ""
|
||||
PARTITION BY RANGE(`datekey`)
|
||||
(PARTITION p20220102 VALUES [("20220101"), ("20220102")),
|
||||
PARTITION p20220103 VALUES [("20220102"), ("20220103")))
|
||||
DISTRIBUTED BY HASH(`datekey`) BUCKETS 1
|
||||
rollup (
|
||||
rollup_1(datekey, sum_col1),
|
||||
rollup_2(datekey, sum_col2)
|
||||
)
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2",
|
||||
"colocate_with" = "group1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE TABLE `test_query_no_colocate` (
|
||||
`datekey` int(11) NULL,
|
||||
`rollup_1_condition` int null,
|
||||
`rollup_2_condition` int null,
|
||||
`sum_col1` bigint(20) SUM NULL,
|
||||
`sum_col2` bigint(20) SUM NULL
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
|
||||
COMMENT ""
|
||||
PARTITION BY RANGE(`datekey`)
|
||||
(PARTITION p20220102 VALUES [("20220101"), ("20220110")),
|
||||
PARTITION p20220103 VALUES [("20220110"), ("20220120")))
|
||||
DISTRIBUTED BY HASH(`datekey`) BUCKETS 5
|
||||
rollup (
|
||||
rollup_1(datekey, sum_col1),
|
||||
rollup_2(datekey, sum_col2)
|
||||
)
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """insert into test_query_colocate1 values
|
||||
(20220101, 102, 200, 200, 100),
|
||||
(20220101, 101, 200, 200, 100),
|
||||
(20220101, 102, 202, 200, 100),
|
||||
(20220101, 101, 202, 200, 100);"""
|
||||
|
||||
sql """insert into test_query_colocate2 values
|
||||
(20220101, 102, 200, 200, 100),
|
||||
(20220101, 101, 200, 200, 100),
|
||||
(20220101, 102, 202, 200, 100),
|
||||
(20220101, 101, 202, 200, 100);"""
|
||||
|
||||
sql """insert into test_query_no_colocate values
|
||||
(20220101, 102, 200, 200, 100),
|
||||
(20220102, 101, 200, 200, 100),
|
||||
(20220103, 102, 202, 200, 100),
|
||||
(20220104, 101, 202, 200, 100),
|
||||
(20220105, 102, 200, 200, 100),
|
||||
(20220106, 101, 200, 200, 100),
|
||||
(20220107, 102, 202, 200, 100),
|
||||
(20220108, 101, 202, 200, 100);"""
|
||||
|
||||
explain {
|
||||
sql("""select sum_col1,sum_col2
|
||||
from
|
||||
(select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1
|
||||
left join
|
||||
(select datekey,sum(sum_col2) as sum_col2 from test_query_colocate1 where datekey=20220101 group by datekey) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
contains "COLOCATE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("""select sum_col1,sum_col2
|
||||
from
|
||||
(select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1
|
||||
left join
|
||||
(select datekey,sum(sum_col1) as sum_col2 from test_query_colocate2 where datekey=20220101 group by datekey) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
contains "COLOCATE"
|
||||
}
|
||||
|
||||
explain {
|
||||
sql("""select sum_col1,sum_col2
|
||||
from
|
||||
(select datekey,sum(sum_col1) as sum_col1 from test_query_colocate1 where datekey=20220101 group by datekey) t1
|
||||
left join
|
||||
(select datekey,sum(sum_col2) as sum_col2 from test_query_colocate2 where datekey=20220101 group by datekey) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
contains "COLOCATE"
|
||||
}
|
||||
|
||||
explain {
|
||||
// hit same rollup, colocate
|
||||
sql("""select sum_col1,sum_col2
|
||||
from
|
||||
(select datekey,sum(sum_col1) as sum_col1 from test_query_no_colocate group by datekey) t1
|
||||
left join
|
||||
(select datekey,sum(sum_col1) as sum_col2 from test_query_no_colocate group by datekey) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
contains "COLOCATE"
|
||||
}
|
||||
|
||||
explain {
|
||||
// hit same base table, colocate
|
||||
sql("""select *
|
||||
from
|
||||
(select datekey from test_query_no_colocate ) t1
|
||||
left join
|
||||
(select datekey from test_query_no_colocate ) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
contains "COLOCATE"
|
||||
}
|
||||
|
||||
explain {
|
||||
// hit diffrent rollup, no colocate
|
||||
sql("""select sum_col1,sum_col2
|
||||
from
|
||||
(select datekey,sum(sum_col1) as sum_col1 from test_query_no_colocate group by datekey) t1
|
||||
left join
|
||||
(select datekey,sum(sum_col2) as sum_col2 from test_query_no_colocate group by datekey) t2
|
||||
on t1.datekey = t2.datekey""")
|
||||
notContains "COLOCATE"
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS test_query_colocate1 """
|
||||
sql """ DROP TABLE IF EXISTS test_query_colocate2 """
|
||||
sql """ DROP TABLE IF EXISTS test_query_no_colocate """
|
||||
}
|
||||
Reference in New Issue
Block a user