[Bug][pipeline] Fix regression tpcds failed in nereid planner (#19885)

This commit is contained in:
HappenLee
2023-05-19 22:30:48 +08:00
committed by GitHub
parent 24b2fab943
commit 77dfdfdd50
2 changed files with 9 additions and 7 deletions

View File

@ -146,7 +146,6 @@ import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
@ -391,8 +390,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// so complex aggregate plan than legacy planner, and should add forbid parallel scan hint when
// generate physical aggregate plan.
if (leftMostNode instanceof OlapScanNode && aggregate.getAggregateParam().needColocateScan) {
currentFragment.getPlanRoot().setShouldColoScan();
currentFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine());
currentFragment.setHasColocatePlanNode(true);
}
setPlanRoot(currentFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {

View File

@ -931,12 +931,16 @@ public class DistributedPlanner {
if (isDistinct) {
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
} else {
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())
|| childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())) {
if (canColocateAgg(node.getAggInfo(), childFragment.getDataPartition())) {
childFragment.addPlanRoot(node);
childFragment.setHasColocatePlanNode(true);
return childFragment;
} else if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
&& childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())
&& childFragment.getPlanRoot() instanceof OlapScanNode) {
childFragment.getPlanRoot().setShouldColoScan();
childFragment.addPlanRoot(node);
// pipeline here should use shared scan to improve performance
childFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine());
childFragment.setHasColocatePlanNode(false);
return childFragment;
} else {
return createMergeAggregationFragment(node, childFragment);