[fix](Nereids) physical property deriver on some node is not right (#30819)
This commit is contained in:
@ -97,7 +97,16 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
|
||||
@Override
|
||||
public PhysicalProperties visit(Plan plan, PlanContext context) {
|
||||
return PhysicalProperties.ANY;
|
||||
if (childrenOutputProperties.isEmpty()) {
|
||||
return PhysicalProperties.ANY;
|
||||
} else {
|
||||
DistributionSpec firstChildSpec = childrenOutputProperties.get(0).getDistributionSpec();
|
||||
if (firstChildSpec instanceof DistributionSpecHash) {
|
||||
return PhysicalProperties.createAnyFromHash((DistributionSpecHash) firstChildSpec);
|
||||
} else {
|
||||
return PhysicalProperties.ANY;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ********************************************************************************************
|
||||
@ -116,7 +125,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
@Override
|
||||
public PhysicalProperties visitPhysicalCTEConsumer(
|
||||
PhysicalCTEConsumer cteConsumer, PlanContext context) {
|
||||
Preconditions.checkState(childrenOutputProperties.size() == 0);
|
||||
Preconditions.checkState(childrenOutputProperties.isEmpty(), "cte consumer should be leaf node");
|
||||
return PhysicalProperties.MUST_SHUFFLE;
|
||||
}
|
||||
|
||||
@ -279,7 +288,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
leftHashSpec.getShuffleType()));
|
||||
}
|
||||
case FULL_OUTER_JOIN:
|
||||
return PhysicalProperties.ANY;
|
||||
return PhysicalProperties.createAnyFromHash(leftHashSpec);
|
||||
default:
|
||||
throw new AnalysisException("unknown join type " + hashJoin.getJoinType());
|
||||
}
|
||||
@ -348,7 +357,12 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
@Override
|
||||
public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanContext context) {
|
||||
Preconditions.checkState(childrenOutputProperties.size() == 1);
|
||||
return PhysicalProperties.ANY.withOrderSpec(childrenOutputProperties.get(0).getOrderSpec());
|
||||
DistributionSpec childDistributionSpec = childrenOutputProperties.get(0).getDistributionSpec();
|
||||
PhysicalProperties output = childrenOutputProperties.get(0);
|
||||
if (childDistributionSpec instanceof DistributionSpecHash) {
|
||||
output = PhysicalProperties.createAnyFromHash((DistributionSpecHash) childDistributionSpec);
|
||||
}
|
||||
return output.withOrderSpec(childrenOutputProperties.get(0).getOrderSpec());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -386,7 +400,12 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
for (int i = 0; i < childrenDistribution.size(); i++) {
|
||||
DistributionSpec childDistribution = childrenDistribution.get(i);
|
||||
if (!(childDistribution instanceof DistributionSpecHash)) {
|
||||
return PhysicalProperties.ANY;
|
||||
if (i != 0) {
|
||||
// NOTICE: if come here, the first child output must be DistributionSpecHash
|
||||
return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
|
||||
} else {
|
||||
return new PhysicalProperties(childDistribution);
|
||||
}
|
||||
}
|
||||
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistribution;
|
||||
int[] offsetsOfCurrentChild = new int[distributionSpecHash.getOrderedShuffledColumns().size()];
|
||||
@ -396,7 +415,8 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
if (offset >= 0) {
|
||||
offsetsOfCurrentChild[offset] = j;
|
||||
} else {
|
||||
return PhysicalProperties.ANY;
|
||||
// NOTICE: if come here, the first child output must be DistributionSpecHash
|
||||
return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
|
||||
}
|
||||
}
|
||||
if (offsetsOfFirstChild == null) {
|
||||
@ -404,7 +424,8 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
offsetsOfFirstChild = offsetsOfCurrentChild;
|
||||
} else if (!Arrays.equals(offsetsOfFirstChild, offsetsOfCurrentChild)
|
||||
|| firstType != ((DistributionSpecHash) childDistribution).getShuffleType()) {
|
||||
return PhysicalProperties.ANY;
|
||||
// NOTICE: if come here, the first child output must be DistributionSpecHash
|
||||
return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
|
||||
}
|
||||
}
|
||||
// bucket
|
||||
@ -422,7 +443,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
} else {
|
||||
// current be could not run const expr on appropriate node,
|
||||
// so if we have constant exprs on union, the output of union always any
|
||||
return PhysicalProperties.ANY;
|
||||
return visit(union, context);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -93,6 +93,14 @@ public class PhysicalProperties {
|
||||
return new PhysicalProperties(distributionSpecHash);
|
||||
}
|
||||
|
||||
public static PhysicalProperties createAnyFromHash(DistributionSpecHash childSpec) {
|
||||
if (childSpec.getShuffleType() == ShuffleType.NATURAL) {
|
||||
return PhysicalProperties.STORAGE_ANY;
|
||||
} else {
|
||||
return PhysicalProperties.ANY;
|
||||
}
|
||||
}
|
||||
|
||||
public PhysicalProperties withOrderSpec(OrderSpec orderSpec) {
|
||||
return new PhysicalProperties(distributionSpec, orderSpec);
|
||||
}
|
||||
|
||||
@ -459,7 +459,7 @@ class ChildOutputPropertyDeriverTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFullOuterJoin() {
|
||||
void testFullOuterJoinWithNatural() {
|
||||
PhysicalHashJoin<GroupPlan, GroupPlan> join = new PhysicalHashJoin<>(JoinType.FULL_OUTER_JOIN,
|
||||
ExpressionUtils.EMPTY_CONDITION, ExpressionUtils.EMPTY_CONDITION, new DistributeHint(DistributeType.NONE), Optional.empty(), logicalProperties,
|
||||
groupPlan, groupPlan);
|
||||
@ -490,7 +490,42 @@ class ChildOutputPropertyDeriverTest {
|
||||
|
||||
PhysicalProperties result = deriver.getOutputProperties(null, groupExpression);
|
||||
Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
|
||||
Assertions.assertTrue(result.getDistributionSpec() instanceof DistributionSpecAny);
|
||||
Assertions.assertInstanceOf(DistributionSpecStorageAny.class, result.getDistributionSpec());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFullOuterJoinWithOther() {
|
||||
PhysicalHashJoin<GroupPlan, GroupPlan> join = new PhysicalHashJoin<>(JoinType.FULL_OUTER_JOIN,
|
||||
ExpressionUtils.EMPTY_CONDITION, ExpressionUtils.EMPTY_CONDITION, new DistributeHint(DistributeType.NONE), Optional.empty(), logicalProperties,
|
||||
groupPlan, groupPlan);
|
||||
GroupExpression groupExpression = new GroupExpression(join);
|
||||
new Group(null, groupExpression, null);
|
||||
|
||||
PhysicalProperties left = new PhysicalProperties(
|
||||
new DistributionSpecHash(
|
||||
Lists.newArrayList(new ExprId(0)),
|
||||
ShuffleType.EXECUTION_BUCKETED,
|
||||
0,
|
||||
Sets.newHashSet(0L)
|
||||
),
|
||||
new OrderSpec(
|
||||
Lists.newArrayList(new OrderKey(new SlotReference("ignored", IntegerType.INSTANCE),
|
||||
true, true)))
|
||||
);
|
||||
|
||||
PhysicalProperties right = new PhysicalProperties(new DistributionSpecHash(
|
||||
Lists.newArrayList(new ExprId(1)),
|
||||
ShuffleType.EXECUTION_BUCKETED,
|
||||
1,
|
||||
Sets.newHashSet(1L)
|
||||
));
|
||||
|
||||
List<PhysicalProperties> childrenOutputProperties = Lists.newArrayList(left, right);
|
||||
ChildOutputPropertyDeriver deriver = new ChildOutputPropertyDeriver(childrenOutputProperties);
|
||||
|
||||
PhysicalProperties result = deriver.getOutputProperties(null, groupExpression);
|
||||
Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
|
||||
Assertions.assertInstanceOf(DistributionSpecAny.class, result.getDistributionSpec());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user