[Opt](pipeline) disable coloagg when the para instance num >= tablet_num * 2 (#18030)
This commit is contained in:
@ -925,7 +925,8 @@ public class DistributedPlanner {
|
||||
if (isDistinct) {
|
||||
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
|
||||
} else {
|
||||
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())) {
|
||||
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())
|
||||
&& childFragment.getPlanRoot().shouldColoAgg()) {
|
||||
childFragment.addPlanRoot(node);
|
||||
childFragment.setHasColocatePlanNode(true);
|
||||
return childFragment;
|
||||
|
||||
@ -1145,6 +1145,18 @@ public class OlapScanNode extends ScanNode {
|
||||
return result.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldColoAgg() {
|
||||
// In pipeline exec engine, the instance num is parallel instance. we should disable colo agg
|
||||
// in parallelInstance >= tablet_num * 2 to use more thread to speed up the query
|
||||
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
|
||||
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
return parallelInstance < result.size() * 2;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
List<String> keyColumnNames = new ArrayList<String>();
|
||||
|
||||
@ -831,6 +831,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
return numInstances;
|
||||
}
|
||||
|
||||
public boolean shouldColoAgg() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void setNumInstances(int numInstances) {
|
||||
this.numInstances = numInstances;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user