diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8f61a499a7..b0126bc53d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -1043,6 +1043,37 @@ public class DistributedPlanner { throws UserException { repeatNode.setNumInstances(childFragment.getPlanRoot().getNumInstances()); childFragment.addPlanRoot(repeatNode); + /* + The Repeat Node will change the data partition of fragment + when the origin data partition of fragment is HashPartition. + For example, + Query: SELECT k1, k2, sum(v1) + FROM table + GROUP BY GROUPING SETS ((k1, k2), (k1), (k2), ( )) + Table schema: table distributed by k1 + The Child Fragment: + Fragment 0 + Data partition: k1 + Repeat Node: repeat 3 lines [[0, 1], [0], [1], []] + OlapScanNode: table + Data before Repeat Node is partitioned by k1 such as: + | Node 1 | | Node 2 | + | 1, 1 | | 2, 1 | + | 1, 2 | | 2, 2 | + Data after Repeat Node is partitioned by RANDOM such as: + | Node 1 | | Node 2 | + | 1, 1 | | 2, 1 | + | 1, 2 | | 2, 2 | + | null,1 | | null,1 | + | null,2 | | null,2 | + ... + The Repeat Node will generate some new rows. + The distribution of these new rows is completely inconsistent with the original data distribution, + their distribution is RANDOM. + Therefore, the data distribution method of the fragment needs to be modified here. + Only the correct data distribution can make the correct result when judging **colocate**. + */ + childFragment.updateDataPartition(DataPartition.RANDOM); return childFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 234e7d3bd8..f714132b7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -96,7 +96,7 @@ public class PlanFragment extends TreeNode { // specification of the partition of the input of this fragment; // an UNPARTITIONED fragment is executed on only a single node // TODO: improve this comment, "input" is a bit misleading - private final DataPartition dataPartition; + private DataPartition dataPartition; // specification of the actually input partition of this fragment when transmitting to be. // By default, the value of the data partition in planner and the data partition transmitted to be are the same. @@ -267,6 +267,13 @@ public class PlanFragment extends TreeNode { return (dataPartition.getType() != TPartitionType.UNPARTITIONED); } + public void updateDataPartition(DataPartition dataPartition) { + if (this.dataPartition == DataPartition.UNPARTITIONED) { + return; + } + this.dataPartition = dataPartition; + } + public PlanFragmentId getId() { return fragmentId; } public PlanFragment getDestFragment() {