[feature](pipelineX) add local_shuffle in set_operation / assert_num operator (#28293)

This commit is contained in:
Mryange
2023-12-13 15:15:20 +08:00
committed by GitHub
parent cd6d75e518
commit 5f66335e54
8 changed files with 41 additions and 5 deletions

View File

@ -1755,6 +1755,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
&& findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) {
setOperationFragment.setHasColocatePlanNode(true);
setOperationNode.setColocate(true);
}
return setOperationFragment;

View File

@ -86,6 +86,8 @@ public abstract class SetOperationNode extends PlanNode {
protected final TupleId tupleId;
private boolean isColocate = false;
protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName, StatisticalType statisticalType) {
super(id, tupleId.asList(), planNodeName, statisticalType);
this.setOpResultExprs = Lists.newArrayList();
@ -124,6 +126,10 @@ public abstract class SetOperationNode extends PlanNode {
resultExprLists.add(exprs);
}
public void setColocate(boolean colocate) {
this.isColocate = colocate;
}
/**
* Returns true if this UnionNode has only constant exprs.
*/
@ -395,11 +401,13 @@ public abstract class SetOperationNode extends PlanNode {
case INTERSECT_NODE:
msg.intersect_node = new TIntersectNode(
tupleId.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx);
msg.intersect_node.setIsColocate(isColocate);
msg.node_type = TPlanNodeType.INTERSECT_NODE;
break;
case EXCEPT_NODE:
msg.except_node = new TExceptNode(
tupleId.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx);
msg.except_node.setIsColocate(isColocate);
msg.node_type = TPlanNodeType.EXCEPT_NODE;
break;
default: