[pipelineX](localexchange) Add local exchange before TabletFunction (#30446)
* [pipelineX](localexchange) Add local exchange before TabletFunction * update
This commit is contained in:
@ -2042,10 +2042,12 @@ public class Coordinator implements CoordInterface {
|
||||
boolean sharedScan = true;
|
||||
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
|
||||
leftMostNode.getNumInstances());
|
||||
boolean ignoreStorageDataDistribution = scanNodes.stream()
|
||||
boolean forceToLocalShuffle = context != null
|
||||
&& context.getSessionVariable().isForceToLocalShuffle();
|
||||
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream()
|
||||
.allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context,
|
||||
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
|
||||
.scanRangeAssignment.size())) && useNereids;
|
||||
.scanRangeAssignment.size())) && useNereids);
|
||||
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|
||||
|| ignoreStorageDataDistribution)) {
|
||||
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
|
||||
@ -2917,13 +2919,15 @@ public class Coordinator implements CoordInterface {
|
||||
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
|
||||
* 2. Use Nereids planner.
|
||||
*/
|
||||
boolean ignoreStorageDataDistribution = scanNodes.stream()
|
||||
boolean forceToLocalShuffle = context != null
|
||||
&& context.getSessionVariable().isForceToLocalShuffle();
|
||||
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream()
|
||||
.allMatch(node -> node.ignoreStorageDataDistribution(context,
|
||||
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
|
||||
.scanRangeAssignment.size()))
|
||||
&& addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
|
||||
return addressScanRange.getValue().size() < parallelExecInstanceNum;
|
||||
}) && useNereids;
|
||||
}) && useNereids);
|
||||
|
||||
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
|
||||
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
|
||||
|
||||
@ -235,6 +235,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
|
||||
|
||||
public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle";
|
||||
|
||||
public static final String ENABLE_AGG_STATE = "enable_agg_state";
|
||||
|
||||
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
|
||||
@ -845,6 +847,12 @@ public class SessionVariable implements Serializable, Writable {
|
||||
"Whether to enable local shuffle on pipelineX engine."})
|
||||
private boolean enableLocalShuffle = true;
|
||||
|
||||
@VariableMgr.VarAttr(
|
||||
name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
|
||||
description = {"是否在pipelineX引擎上强制开启local shuffle优化",
|
||||
"Whether to force to local shuffle on pipelineX engine."})
|
||||
private boolean forceToLocalShuffle = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
|
||||
needForward = true)
|
||||
public boolean enableAggState = false;
|
||||
@ -3303,4 +3311,12 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public void setForceJniScanner(boolean force) {
|
||||
forceJniScanner = force;
|
||||
}
|
||||
|
||||
public boolean isForceToLocalShuffle() {
|
||||
return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle;
|
||||
}
|
||||
|
||||
public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
|
||||
this.forceToLocalShuffle = forceToLocalShuffle;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user