[fix](Nereids) result error when do agg with distinct under pipeline (#19735)

This commit is contained in:
Zhang Wenxin
2023-05-17 17:08:42 +08:00
committed by GitHub
parent d76e2e2254
commit 6ba2f681af
3 changed files with 21 additions and 7 deletions

View File

@ -391,7 +391,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// TODO: nereids forbid all parallel scan under aggregate temporary, because nereids could generate
// so complex aggregate plan than legacy planner, and should add forbid parallel scan hint when
// generate physical aggregate plan.
if (leftMostNode instanceof OlapScanNode && aggregate.getAggMode() == AggMode.INPUT_TO_RESULT) {
if (leftMostNode instanceof OlapScanNode && aggregate.getAggregateParam().needColocateScan) {
currentFragment.getPlanRoot().setShouldColoScan();
currentFragment.setHasColocatePlanNode(!ConnectContext.get().getSessionVariable().enablePipelineEngine());
}

View File

@ -788,7 +788,7 @@ public class AggregateStrategies implements ImplementationRuleFactory {
.addAll(distinctArguments)
.build();
AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_BUFFER);
AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_BUFFER, true);
Map<AggregateFunction, Alias> nonDistinctAggFunctionToAliasPhase1 = aggregateFunctions.stream()
.filter(aggregateFunction -> !aggregateFunction.isDistinct())
@ -1295,7 +1295,7 @@ public class AggregateStrategies implements ImplementationRuleFactory {
.addAll(distinctArguments)
.build();
AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_BUFFER);
AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_BUFFER, true);
Map<AggregateFunction, Alias> nonDistinctAggFunctionToAliasPhase1 = aggregateFunctions.stream()
.filter(aggregateFunction -> !aggregateFunction.isDistinct())

View File

@ -29,26 +29,39 @@ public class AggregateParam {
public final AggMode aggMode;
// TODO remove this flag, and generate it in enforce and cost job
public boolean needColocateScan;
/** AggregateParam */
public AggregateParam(AggPhase aggPhase, AggMode aggMode) {
this(aggPhase, aggMode, false);
}
/** AggregateParam */
public AggregateParam(AggPhase aggPhase, AggMode aggMode, boolean needColocateScan) {
this.aggMode = Objects.requireNonNull(aggMode, "aggMode cannot be null");
this.aggPhase = Objects.requireNonNull(aggPhase, "aggPhase cannot be null");
this.needColocateScan = needColocateScan;
}
public static AggregateParam localResult() {
return new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_RESULT);
return new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_RESULT, true);
}
public AggregateParam withAggPhase(AggPhase aggPhase) {
return new AggregateParam(aggPhase, aggMode);
return new AggregateParam(aggPhase, aggMode, needColocateScan);
}
public AggregateParam withAggPhase(AggMode aggMode) {
return new AggregateParam(aggPhase, aggMode);
return new AggregateParam(aggPhase, aggMode, needColocateScan);
}
public AggregateParam withAppPhaseAndAppMode(AggPhase aggPhase, AggMode aggMode) {
return new AggregateParam(aggPhase, aggMode);
return new AggregateParam(aggPhase, aggMode, needColocateScan);
}
public AggregateParam withNeedColocateScan(boolean needColocateScan) {
return new AggregateParam(aggPhase, aggMode, needColocateScan);
}
@Override
@ -74,6 +87,7 @@ public class AggregateParam {
return "AggregateParam{"
+ "aggPhase=" + aggPhase
+ ", aggMode=" + aggMode
+ ", needColocateScan=" + needColocateScan
+ '}';
}
}