diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java index 5e3906d93b..f5c11fd4c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java @@ -412,7 +412,7 @@ public class ScalarFunction extends Function { public TFunction toThrift(Type realReturnType, Type[] realArgTypes) { TFunction fn = super.toThrift(realReturnType, realArgTypes); fn.setScalarFn(new TScalarFunction()); - if (getBinaryType() != TFunctionBinaryType.BUILTIN || !VectorizedUtil.optRpcForPipeline()) { + if (getBinaryType() != TFunctionBinaryType.BUILTIN || !VectorizedUtil.isPipeline()) { fn.getScalarFn().setSymbol(symbolName); if (prepareFnSymbol != null) { fn.getScalarFn().setPrepareFnSymbol(prepareFnSymbol); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java index 39dc84ac6b..ade791dd2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java @@ -41,13 +41,4 @@ public class VectorizedUtil { } return connectContext.getSessionVariable().enablePipelineEngine(); } - - public static boolean optRpcForPipeline() { - ConnectContext connectContext = ConnectContext.get(); - if (connectContext == null) { - return false; - } - return connectContext.getSessionVariable().enablePipelineEngine() - && connectContext.getSessionVariable().enableRpcOptForPipeline(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index fdba950667..5aa22b27e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -249,8 +249,6 @@ public class Coordinator { private boolean enablePipelineEngine = false; - private boolean enableRpcOptForPipeline = false; - // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -328,8 +326,6 @@ public class Coordinator { this.returnedAllResults = false; this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine; - this.enableRpcOptForPipeline = context.getSessionVariable().enablePipelineEngine - && context.getSessionVariable().enableRpcOptForPipeline; initQueryOptions(context); setFromUserProperty(analyzer); @@ -499,7 +495,7 @@ public class Coordinator { public Map getBeToInstancesNum() { Map result = Maps.newTreeMap(); - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + ctxs.brpcAddr.port), ctxs.ctxs.size()); } @@ -644,7 +640,7 @@ public class Coordinator { profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); } if (!isPointQuery) { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { sendPipelineCtx(); } else { sendFragment(); @@ -1273,7 +1269,7 @@ public class Coordinator { } private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { ctx.cancelFragmentInstance(cancelReason); } @@ -2098,7 +2094,7 @@ public class Coordinator { } public void updateFragmentExecStatus(TReportExecStatusParams params) { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { PipelineExecContext ctx = pipelineExecContexts.get(params.getFragmentId()); if (!ctx.updateProfile(params)) { return; @@ -2217,7 +2213,7 @@ public class Coordinator { } public void endProfile(boolean waitProfileDone) { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { if (pipelineExecContexts.isEmpty()) { return; } @@ -2285,7 +2281,7 @@ public class Coordinator { * return true if all of them are OK. Otherwise, return false. */ private boolean checkBackendState() { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { for (PipelineExecContext ctx : needCheckPipelineExecContexts) { if (!ctx.isBackendStateHealthy()) { queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " @@ -3297,7 +3293,7 @@ public class Coordinator { Lists.newArrayList(); lock(); try { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { for (int index = 0; index < fragments.size(); index++) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { if (fragments.get(index).getFragmentId() != ctx.fragmentId) { @@ -3326,7 +3322,7 @@ public class Coordinator { } private void attachInstanceProfileToFragmentProfile() { - if (enableRpcOptForPipeline) { + if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { if (!ctx.computeTimeInProfile(fragmentProfile.size())) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 0fafbe4c7a..e218b0af7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -523,9 +523,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true) public boolean enablePipelineEngine = false; - @VariableMgr.VarAttr(name = ENABLE_RPC_OPT_FOR_PIPELINE) - public boolean enableRpcOptForPipeline = true; - @VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE) public boolean enableParallelOutfile = false; @@ -1335,10 +1332,6 @@ public class SessionVariable implements Serializable, Writable { return enablePipelineEngine; } - public boolean enableRpcOptForPipeline() { - return enableRpcOptForPipeline; - } - public void setEnablePipelineEngine(boolean enablePipelineEngine) { this.enablePipelineEngine = enablePipelineEngine; }