[pipelineX](improvement) enable local shuffle by default (#28046)

This commit is contained in:
Gabriel
2023-12-06 16:39:48 +08:00
committed by GitHub
parent fa5096f510
commit 28817990b7
10 changed files with 43 additions and 19 deletions

View File

@ -860,6 +860,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
&& inputPlanFragment.getDataPartition().getType() != TPartitionType.RANDOM
&& aggregate.getAggregateParam().aggMode != AggMode.INPUT_TO_BUFFER) {
inputPlanFragment.setHasColocatePlanNode(true);
// Set colocate info in agg node. This is a hint for local shuffling to decide which type of
// local exchanger will be used.
aggregationNode.setColocate(true);
}
setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {

View File

@ -59,6 +59,7 @@ public class AggregationNode extends PlanNode {
// Set to true if this aggregation node needs to run the Finalize step. This
// node is the root node of a distributed aggregation.
private boolean needsFinalize;
private boolean isColocate = false;
// If true, use streaming preaggregation algorithm. Not valid if this is a merge agg.
private boolean useStreamingPreagg;
@ -277,6 +278,7 @@ public class AggregationNode extends PlanNode {
msg.agg_node.setAggSortInfos(aggSortInfos);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
msg.agg_node.setIsColocate(isColocate);
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
@ -375,4 +377,8 @@ public class AggregationNode extends PlanNode {
aggInfo.getOutputTupleDesc().computeMemLayout();
}
}
public void setColocate(boolean colocate) {
isColocate = colocate;
}
}

View File

@ -302,6 +302,7 @@ public class Coordinator implements CoordInterface {
initQueryOptions(context);
if (planner instanceof OriginalPlanner) {
// Enable local shuffle on pipelineX engine only if Nereids planner is applied.
queryOptions.setEnableLocalShuffle(false);
}

View File

@ -770,8 +770,11 @@ public class SessionVariable implements Serializable, Writable {
needForward = true)
private boolean enableSharedScan = false;
@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enableLocalShuffle = false;
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
"Whether to enable local shuffle on pipelineX engine."})
private boolean enableLocalShuffle = true;
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
public boolean enableAggState = false;