From e9ff3d185b94d982ea0abe43e230100b2513dc26 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 23 Mar 2023 15:53:13 +0800 Subject: [PATCH] [Opt](pipeline) disable coloagg when the para instance num >= tablet_num * 2 (#18030) --- .../org/apache/doris/planner/DistributedPlanner.java | 3 ++- .../java/org/apache/doris/planner/OlapScanNode.java | 12 ++++++++++++ .../main/java/org/apache/doris/planner/PlanNode.java | 4 ++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 88bd7650fb..5999f33406 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -925,7 +925,8 @@ public class DistributedPlanner { if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())) { + if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition()) + && childFragment.getPlanRoot().shouldColoAgg()) { childFragment.addPlanRoot(node); childFragment.setHasColocatePlanNode(true); return childFragment; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 958075508b..d4d7ff4de0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1145,6 +1145,18 @@ public class OlapScanNode extends ScanNode { return result.size(); } + @Override + public boolean shouldColoAgg() { + // In pipeline exec engine, the instance num is parallel instance. we should disable colo agg + // in parallelInstance >= tablet_num * 2 to use more thread to speed up the query + if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) { + int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + return parallelInstance < result.size() * 2; + } else { + return true; + } + } + @Override protected void toThrift(TPlanNode msg) { List keyColumnNames = new ArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 7f065a5856..0b10c3afe2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -831,6 +831,10 @@ public abstract class PlanNode extends TreeNode implements PlanStats { return numInstances; } + public boolean shouldColoAgg() { + return true; + } + public void setNumInstances(int numInstances) { this.numInstances = numInstances; }