[fix](colocate) Fix the error colocate plan when query is (rollup + instance >1) (#8779)
The Repeat Node will change the fragment data partition. So the output partition of child fragment is different from the data partition of current fragment. When judging whether colocate can be enabled, the current data partition of fragment should be used directly instead of the child's output partition. Before this PR fix, queries with '''rollup + concurrency greater than 1''' may have incorrect results. For example: ``` select t1.tc1,t1.tc2,sum(t1.tc3) as total from t1 join[shuffle] t1 t2 on t1.tc1=t2.tc1 group by rollup(tc1,tc2) order by t1.tc1,t1.tc2,total; ``` Fixed #8778
This commit is contained in:
@ -927,7 +927,7 @@ public class DistributedPlanner {
|
||||
if (isDistinct) {
|
||||
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
|
||||
} else {
|
||||
if (canColocateAgg(node.getAggInfo(), childFragment.getInputDataPartition())) {
|
||||
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())) {
|
||||
childFragment.addPlanRoot(node);
|
||||
childFragment.setHasColocatePlanNode(true);
|
||||
return childFragment;
|
||||
@ -942,7 +942,7 @@ public class DistributedPlanner {
|
||||
* 1. Session variables disable_colocate_plan = false
|
||||
* 2. The input data partition of child fragment < agg node partition exprs
|
||||
*/
|
||||
private boolean canColocateAgg(AggregateInfo aggregateInfo, List<DataPartition> childFragmentDataPartition) {
|
||||
private boolean canColocateAgg(AggregateInfo aggregateInfo, DataPartition childFragmentDataPartition) {
|
||||
// Condition1
|
||||
if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
|
||||
LOG.debug("Agg node is not colocate in:" + ConnectContext.get().queryId()
|
||||
@ -952,10 +952,8 @@ public class DistributedPlanner {
|
||||
|
||||
// Condition2
|
||||
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
|
||||
for (DataPartition childDataPartition : childFragmentDataPartition) {
|
||||
if (dataPartitionMatchAggInfo(childDataPartition, aggPartitionExprs)) {
|
||||
return true;
|
||||
}
|
||||
if (dataPartitionMatchAggInfo(childFragmentDataPartition, aggPartitionExprs)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -329,15 +329,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
dest.addChild(this);
|
||||
}
|
||||
|
||||
public List<DataPartition> getInputDataPartition() {
|
||||
List<DataPartition> result = Lists.newArrayList();
|
||||
result.add(getDataPartition());
|
||||
for (PlanFragment child : children) {
|
||||
result.add(child.getOutputPartition());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public DataPartition getDataPartition() {
|
||||
return dataPartition;
|
||||
}
|
||||
|
||||
@ -186,4 +186,23 @@ public class ColocatePlanTest {
|
||||
Assert.assertTrue(isColocateFragment1);
|
||||
}
|
||||
|
||||
// Fix #8778
|
||||
@Test
|
||||
public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception {
|
||||
String createColocateTblStmtStr = "create table db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) "
|
||||
+ "distributed by hash(k1, k2, k3) buckets 10 properties('replication_num' = '1');";
|
||||
CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().createTable(createColocateTableStmt);
|
||||
|
||||
String sql = "select a.k1, a.k2, sum(a.k3) "
|
||||
+ "from db1.test_colocate_one_backend a join[shuffle] db1.test_colocate_one_backend b on a.k1=b.k1 "
|
||||
+ "group by rollup(a.k1, a.k2);";
|
||||
Deencapsulation.setField(ctx.getSessionVariable(), "parallelExecInstanceNum", 2);
|
||||
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
|
||||
Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
|
||||
Assert.assertEquals(5, StringUtils.countMatches(plan1, "PLAN FRAGMENT"));
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user