diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PlanPreprocessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PlanPreprocessors.java index 5b5213801f..108d52d430 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PlanPreprocessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PlanPreprocessors.java @@ -46,7 +46,8 @@ public class PlanPreprocessors { public List getProcessors() { // add processor if we need return ImmutableList.of( - new EliminateLogicalSelectHint() + new EliminateLogicalSelectHint(), + new TurnOffPipelineForDml() ); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java new file mode 100644 index 0000000000..79afe5d2ea --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.pre; + +import org.apache.doris.analysis.SetVar; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.VariableMgr; + +/** + * TODO turnoff pipeline for any dml temporary, remove this pre-process when pipeline-sink is ok. + */ +public class TurnOffPipelineForDml extends PlanPreprocessor { + + @Override + public Plan visitUnboundOlapTableSink(UnboundOlapTableSink unboundOlapTableSink, + StatementContext context) { + turnOffPipeline(context); + return unboundOlapTableSink; + } + + @Override + public Plan visitLogicalFileSink(LogicalFileSink fileSink, StatementContext context) { + turnOffPipeline(context); + return fileSink; + } + + private void turnOffPipeline(StatementContext context) { + SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); + // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute + sessionVariable.setIsSingleSetVar(true); + try { + VariableMgr.setVar(sessionVariable, + new SetVar(SessionVariable.ENABLE_PIPELINE_ENGINE, new StringLiteral("false"))); + } catch (Throwable t) { + throw new AnalysisException("Can not set turn off pipeline for DML", t); + } + } +} diff --git a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy index e7d5873d36..6d327366c4 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy @@ -24,6 +24,17 @@ suite('nereids_insert_no_partition') { sql 'set enable_nereids_dml=true' sql 'set parallel_fragment_exec_instance_num=13' + explain { + // TODO: test turn off pipeline when dml, remove it if pipeline sink is ok + sql ''' + insert into uni_light_sc_mow_not_null_nop_t with t as( + select * except(kaint) from src where id is not null) + select * from t left semi join t t2 on t.id = t2.id; + ''' + + notContains("MultiCastDataSinks") + } + sql '''insert into agg_nop_t select * except(kaint) from src''' sql 'sync'