[Refactor](pipeline) Remove unless fe session variable enable_rpc_opt_for_pipeline (#18019)

This commit is contained in:
HappenLee
2023-03-23 07:27:58 +08:00
committed by GitHub
parent d9059ef070
commit 58b00858ab
4 changed files with 9 additions and 29 deletions

View File

@ -412,7 +412,7 @@ public class ScalarFunction extends Function {
public TFunction toThrift(Type realReturnType, Type[] realArgTypes) {
TFunction fn = super.toThrift(realReturnType, realArgTypes);
fn.setScalarFn(new TScalarFunction());
if (getBinaryType() != TFunctionBinaryType.BUILTIN || !VectorizedUtil.optRpcForPipeline()) {
if (getBinaryType() != TFunctionBinaryType.BUILTIN || !VectorizedUtil.isPipeline()) {
fn.getScalarFn().setSymbol(symbolName);
if (prepareFnSymbol != null) {
fn.getScalarFn().setPrepareFnSymbol(prepareFnSymbol);

View File

@ -41,13 +41,4 @@ public class VectorizedUtil {
}
return connectContext.getSessionVariable().enablePipelineEngine();
}
public static boolean optRpcForPipeline() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
}
return connectContext.getSessionVariable().enablePipelineEngine()
&& connectContext.getSessionVariable().enableRpcOptForPipeline();
}
}

View File

@ -249,8 +249,6 @@ public class Coordinator {
private boolean enablePipelineEngine = false;
private boolean enableRpcOptForPipeline = false;
// Runtime filter merge instance address and ID
public TNetworkAddress runtimeFilterMergeAddr;
public TUniqueId runtimeFilterMergeInstanceId;
@ -328,8 +326,6 @@ public class Coordinator {
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine;
this.enableRpcOptForPipeline = context.getSessionVariable().enablePipelineEngine
&& context.getSessionVariable().enableRpcOptForPipeline;
initQueryOptions(context);
setFromUserProperty(analyzer);
@ -499,7 +495,7 @@ public class Coordinator {
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + ctxs.brpcAddr.port), ctxs.ctxs.size());
}
@ -644,7 +640,7 @@ public class Coordinator {
profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */);
}
if (!isPointQuery) {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
sendFragment();
@ -1273,7 +1269,7 @@ public class Coordinator {
}
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
ctx.cancelFragmentInstance(cancelReason);
}
@ -2098,7 +2094,7 @@ public class Coordinator {
}
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
PipelineExecContext ctx = pipelineExecContexts.get(params.getFragmentId());
if (!ctx.updateProfile(params)) {
return;
@ -2217,7 +2213,7 @@ public class Coordinator {
}
public void endProfile(boolean waitProfileDone) {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
if (pipelineExecContexts.isEmpty()) {
return;
}
@ -2285,7 +2281,7 @@ public class Coordinator {
* return true if all of them are OK. Otherwise, return false.
*/
private boolean checkBackendState() {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
if (!ctx.isBackendStateHealthy()) {
queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend "
@ -3297,7 +3293,7 @@ public class Coordinator {
Lists.newArrayList();
lock();
try {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
for (int index = 0; index < fragments.size(); index++) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (fragments.get(index).getFragmentId() != ctx.fragmentId) {
@ -3326,7 +3322,7 @@ public class Coordinator {
}
private void attachInstanceProfileToFragmentProfile() {
if (enableRpcOptForPipeline) {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (!ctx.computeTimeInProfile(fragmentProfile.size())) {
return;

View File

@ -523,9 +523,6 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true)
public boolean enablePipelineEngine = false;
@VariableMgr.VarAttr(name = ENABLE_RPC_OPT_FOR_PIPELINE)
public boolean enableRpcOptForPipeline = true;
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
public boolean enableParallelOutfile = false;
@ -1335,10 +1332,6 @@ public class SessionVariable implements Serializable, Writable {
return enablePipelineEngine;
}
public boolean enableRpcOptForPipeline() {
return enableRpcOptForPipeline;
}
public void setEnablePipelineEngine(boolean enablePipelineEngine) {
this.enablePipelineEngine = enablePipelineEngine;
}