[pipelineX](bug) Fix incorrect join operator judgement (#31690)
* [pipelineX](bug) Fix incorrect join operator judgement * update
This commit is contained in:
@ -1205,9 +1205,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
hashJoinNode.setDistributeExprLists(distributeExprLists);
|
||||
PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin);
|
||||
|
||||
if (joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
currentFragment.setHasNullAwareLeftAntiJoin(true);
|
||||
}
|
||||
if (JoinUtils.shouldColocateJoin(physicalHashJoin)) {
|
||||
// TODO: add reason
|
||||
hashJoinNode.setColocate(true, "");
|
||||
|
||||
@ -221,8 +221,6 @@ public class PlanTranslatorContext {
|
||||
srcFragment.getBuilderRuntimeFilterIds().forEach(targetFragment::setBuilderRuntimeFilterIds);
|
||||
targetFragment.setHasColocatePlanNode(targetFragment.hasColocatePlanNode()
|
||||
|| srcFragment.hasColocatePlanNode());
|
||||
targetFragment.setHasNullAwareLeftAntiJoin(targetFragment.isHasNullAwareLeftAntiJoin()
|
||||
|| srcFragment.isHasNullAwareLeftAntiJoin());
|
||||
this.planFragments.remove(srcFragment);
|
||||
}
|
||||
|
||||
|
||||
@ -784,10 +784,8 @@ public class HashJoinNode extends JoinNodeBase {
|
||||
if (eqJoinConjuncts.isEmpty()) {
|
||||
Preconditions.checkState(joinOp == JoinOperator.LEFT_SEMI_JOIN
|
||||
|| joinOp == JoinOperator.LEFT_ANTI_JOIN);
|
||||
if (joinOp == JoinOperator.LEFT_SEMI_JOIN) {
|
||||
msg.hash_join_node.join_op = JoinOperator.NULL_AWARE_LEFT_SEMI_JOIN.toThrift();
|
||||
} else if (joinOp == JoinOperator.LEFT_ANTI_JOIN) {
|
||||
msg.hash_join_node.join_op = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN.toThrift();
|
||||
if (joinOp == JoinOperator.LEFT_SEMI_JOIN || joinOp == JoinOperator.LEFT_ANTI_JOIN) {
|
||||
msg.hash_join_node.join_op = transformJoinOperator().toThrift();
|
||||
}
|
||||
// because eqJoinConjuncts mustn't be empty in thrift
|
||||
// we have to use markJoinConjuncts instead
|
||||
@ -970,4 +968,22 @@ public class HashJoinNode extends JoinNodeBase {
|
||||
return slotRef;
|
||||
}
|
||||
}
|
||||
|
||||
private JoinOperator transformJoinOperator() {
|
||||
boolean transformToNullAware = markJoinConjuncts != null && eqJoinConjuncts.isEmpty();
|
||||
if (joinOp == JoinOperator.LEFT_ANTI_JOIN && transformToNullAware) {
|
||||
return JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
} else if (joinOp == JoinOperator.LEFT_SEMI_JOIN && transformToNullAware) {
|
||||
return JoinOperator.NULL_AWARE_LEFT_SEMI_JOIN;
|
||||
}
|
||||
return joinOp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNullAwareLeftAntiJoin() {
|
||||
if (transformJoinOperator() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return true;
|
||||
}
|
||||
return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin);
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,7 +32,6 @@ public class MultiCastPlanFragment extends PlanFragment {
|
||||
super(planFragment.getFragmentId(), planFragment.getPlanRoot(), planFragment.getDataPartition(),
|
||||
planFragment.getBuilderRuntimeFilterIds(), planFragment.getTargetRuntimeFilterIds());
|
||||
this.hasColocatePlanNode = planFragment.hasColocatePlanNode;
|
||||
this.hasNullAwareLeftAntiJoin = planFragment.hasNullAwareLeftAntiJoin;
|
||||
this.outputPartition = DataPartition.RANDOM;
|
||||
this.children.addAll(planFragment.getChildren());
|
||||
}
|
||||
|
||||
@ -149,8 +149,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
// has colocate plan node
|
||||
protected boolean hasColocatePlanNode = false;
|
||||
|
||||
protected boolean hasNullAwareLeftAntiJoin = false;
|
||||
|
||||
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
|
||||
|
||||
/**
|
||||
@ -473,11 +471,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
this.bucketNum = bucketNum;
|
||||
}
|
||||
|
||||
public boolean isHasNullAwareLeftAntiJoin() {
|
||||
return hasNullAwareLeftAntiJoin;
|
||||
}
|
||||
|
||||
public void setHasNullAwareLeftAntiJoin(boolean hasNullAwareLeftAntiJoin) {
|
||||
this.hasNullAwareLeftAntiJoin = hasNullAwareLeftAntiJoin;
|
||||
public boolean hasNullAwareLeftAntiJoin() {
|
||||
return planRoot.isNullAwareLeftAntiJoin();
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,6 +261,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
this.fragment = fragment;
|
||||
}
|
||||
|
||||
public boolean isNullAwareLeftAntiJoin() {
|
||||
return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin);
|
||||
}
|
||||
|
||||
public PlanFragment getFragment() {
|
||||
return fragment;
|
||||
}
|
||||
|
||||
@ -726,7 +726,7 @@ public abstract class ScanNode extends PlanNode {
|
||||
return context != null
|
||||
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
|
||||
&& context.getSessionVariable().getEnablePipelineXEngine()
|
||||
&& !fragment.isHasNullAwareLeftAntiJoin()
|
||||
&& !fragment.hasNullAwareLeftAntiJoin()
|
||||
&& getScanRangeNum()
|
||||
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user