[fix](nereids)disable parallel scan in some case (#25089)
This commit is contained in:
@ -63,6 +63,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated;
|
||||
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
|
||||
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
|
||||
import org.apache.doris.nereids.properties.OrderKey;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.nereids.trees.UnaryNode;
|
||||
@ -1537,6 +1538,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
PartitionSortNode partitionSortNode = translatePartitionSortNode(
|
||||
partitionTopN, inputFragment.getPlanRoot(), context);
|
||||
addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
|
||||
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
|
||||
// we need turn of parallel scan to ensure to get correct result.
|
||||
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
|
||||
if (findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
|
||||
inputFragment.setHasColocatePlanNode(true);
|
||||
}
|
||||
return inputFragment;
|
||||
}
|
||||
|
||||
@ -1737,6 +1744,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
setPlanRoot(setOperationFragment, setOperationNode, setOperation);
|
||||
}
|
||||
|
||||
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
|
||||
// we need turn of parallel scan to ensure to get correct result.
|
||||
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
|
||||
if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
|
||||
&& findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) {
|
||||
setOperationFragment.setHasColocatePlanNode(true);
|
||||
}
|
||||
|
||||
return setOperationFragment;
|
||||
}
|
||||
|
||||
@ -1994,6 +2009,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
bufferedTupleDesc
|
||||
);
|
||||
inputPlanFragment.addPlanRoot(analyticEvalNode);
|
||||
|
||||
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
|
||||
// we need turn of parallel scan to ensure to get correct result.
|
||||
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
|
||||
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
|
||||
inputPlanFragment.setHasColocatePlanNode(true);
|
||||
}
|
||||
return inputPlanFragment;
|
||||
}
|
||||
|
||||
@ -2359,4 +2381,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
return getCTEConsumerChild((PhysicalPlan) root.child(0));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) {
|
||||
if (root instanceof OlapScanNode) {
|
||||
return true;
|
||||
} else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) {
|
||||
return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -326,6 +326,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
}
|
||||
str.append("\n");
|
||||
str.append(" PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n");
|
||||
str.append(" HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n");
|
||||
str.append("\n");
|
||||
if (sink != null) {
|
||||
str.append(sink.getExplainString(" ", explainLevel) + "\n");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user