diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index a6c90e3dec..c8f64a447a 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -96,6 +96,10 @@ public: local_state._child_source_state != SourceState::FINISHED; } + DataDistribution required_data_distribution() const override { + return {ExchangeType::PASSTHROUGH}; + } + Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state) const override { auto& local_state = get_local_state(state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 099aedd7e1..cca9014a71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2042,10 +2042,12 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = scanNodes.stream() + boolean forceToLocalShuffle = context != null + && context.getSessionVariable().isForceToLocalShuffle(); + boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) - .scanRangeAssignment.size())) && useNereids; + .scanRangeAssignment.size())) && useNereids); if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2917,13 +2919,15 @@ public class Coordinator implements CoordInterface { * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. * 2. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = scanNodes.stream() + boolean forceToLocalShuffle = context != null + && context.getSessionVariable().isForceToLocalShuffle(); + boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, fragmentExecParamsMap.get(node.getFragment().getFragmentId()) .scanRangeAssignment.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; - }) && useNereids; + }) && useNereids); FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>>> addressScanRange diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 84809f31b8..cef6babc80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -235,6 +235,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; + public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle"; + public static final String ENABLE_AGG_STATE = "enable_agg_state"; public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; @@ -845,6 +847,12 @@ public class SessionVariable implements Serializable, Writable { "Whether to enable local shuffle on pipelineX engine."}) private boolean enableLocalShuffle = true; + @VariableMgr.VarAttr( + name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"是否在pipelineX引擎上强制开启local shuffle优化", + "Whether to force to local shuffle on pipelineX engine."}) + private boolean forceToLocalShuffle = false; + @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) public boolean enableAggState = false; @@ -3303,4 +3311,12 @@ public class SessionVariable implements Serializable, Writable { public void setForceJniScanner(boolean force) { forceJniScanner = force; } + + public boolean isForceToLocalShuffle() { + return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle; + } + + public void setForceToLocalShuffle(boolean forceToLocalShuffle) { + this.forceToLocalShuffle = forceToLocalShuffle; + } }