From e82d8433bee672fbb8162b18e1fa68b791e60386 Mon Sep 17 00:00:00 2001 From: Kikyou1997 <33112463+Kikyou1997@users.noreply.github.com> Date: Fri, 24 Jun 2022 18:26:26 +0800 Subject: [PATCH] [feature](nereids) Integrate nereids into current SQL process framework (#10304) To integrate the nereids optimizer with new SQLParser and Planner to existing SQL process framework, I abstract a interface which named "Planner" and let the Both planner from nereids and stale optimizer implement it, to disguish it with origin Planner, I rename the Planner to OriginalPlanner. As we don't want to impact the existing logic too much, I defined a LogicalPlanAdapter to adapt the logicalPlan that is the output of the new paser to the existing code. Besides, as the MySQL protocol supports sending multiple statements in one packet, so I add Nereids#SparseSQL method to handle this properly. --- .../doris/load/update/UpdatePlanner.java | 5 +- .../{Planner.java => NereidsPlanner.java} | 49 ++- .../nereids/parser/LogicalPlanBuilder.java | 14 + .../doris/nereids/parser/NereidsParser.java | 108 +++++ .../doris/nereids/parser/SqlParser.java | 85 ---- .../properties/PhysicalProperties.java | 2 + .../org/apache/doris/nereids/qe/Executor.java | 59 --- .../trees/plans/PhysicalPlanTranslator.java | 11 +- .../nereids/trees/plans/PlanContext.java | 14 +- .../plans/logical/LogicalPlanAdapter.java | 44 ++ .../apache/doris/planner/OriginalPlanner.java | 401 ++++++++++++++++++ .../org/apache/doris/planner/Planner.java | 379 +---------------- .../org/apache/doris/qe/ConnectProcessor.java | 14 +- .../org/apache/doris/qe/SessionVariable.java | 19 + .../org/apache/doris/qe/StmtExecutor.java | 25 +- .../apache/doris/analysis/SelectStmtTest.java | 12 +- .../org/apache/doris/nereids/AnalyzeTest.java | 8 +- .../nereids/parser/NereidsParserTest.java | 51 +++ .../rewrite/ExpressionRewriteTest.java | 8 +- .../expressions/ExpressionParserTest.java | 8 +- .../doris/planner/DistributedPlannerTest.java | 12 +- .../org/apache/doris/planner/PlannerTest.java | 48 +-- .../org/apache/doris/qe/CoordinatorTest.java | 28 +- .../org/apache/doris/qe/StmtExecutorTest.java | 6 +- .../org/apache/doris/utframe/DorisAssert.java | 2 +- .../doris/utframe/TestWithFeService.java | 2 +- .../apache/doris/utframe/UtFrameUtils.java | 2 +- 27 files changed, 817 insertions(+), 599 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{Planner.java => NereidsPlanner.java} (67%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/parser/SqlParser.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/qe/Executor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPlanAdapter.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java index 887e2b33e1..51a28779dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java @@ -35,10 +35,10 @@ import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import com.google.common.base.Preconditions; @@ -49,7 +49,7 @@ import java.util.List; import java.util.Map; -public class UpdatePlanner extends Planner { +public class UpdatePlanner extends OriginalPlanner { private final IdGenerator nodeIdGenerator = PlanNodeId.createGenerator(); private final IdGenerator fragmentIdGenerator = @@ -65,6 +65,7 @@ public class UpdatePlanner extends Planner { public UpdatePlanner(long dbId, OlapTable targetTable, List setExprs, TupleDescriptor srcTupleDesc, Analyzer analyzer) { + super(analyzer); this.targetDBId = dbId; this.targetTable = targetTable; this.setExprs = setExprs; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java similarity index 67% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/Planner.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index ab9396b35e..5af438c3b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -17,27 +17,64 @@ package org.apache.doris.nereids; +import org.apache.doris.analysis.StatementBase; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob; import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.memo.Memo; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.plans.PhysicalPlanTranslator; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanContext; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.List; /** * Planner to do query plan in Nereids. */ -public class Planner { +public class NereidsPlanner extends Planner { + private PlannerContext plannerContext; + private final ConnectContext ctx; + private List scanNodeList = null; + + public NereidsPlanner(ConnectContext ctx) { + this.ctx = ctx; + } + + @Override + public void plan(StatementBase queryStmt, + org.apache.doris.thrift.TQueryOptions queryOptions) throws UserException { + if (!(queryStmt instanceof LogicalPlanAdapter)) { + throw new RuntimeException("Wrong type of queryStmt, expected: "); + } + LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt; + PhysicalPlan physicalPlan = plan(logicalPlanAdapter.getLogicalPlan(), new PhysicalProperties(), ctx); + PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(); + PlanContext planContext = new PlanContext(); + physicalPlanTranslator.translatePlan(physicalPlan, planContext); + fragments = new ArrayList<>(planContext.getPlanFragmentList()); + PlanFragment root = fragments.get(fragments.size() - 1); + root.setOutputExprs(queryStmt.getResultExprs()); + if (VectorizedUtil.isVectorized()) { + root.getPlanRoot().convertToVectoriezd(); + } + scanNodeList = planContext.getScanNodeList(); + } /** * Do analyze and optimize for query plan. @@ -67,6 +104,11 @@ public class Planner { return getRoot().extractPlan(); } + @Override + public List getScanNodes() { + return scanNodeList; + } + public Group getRoot() { return plannerContext.getOptimizerContext().getMemo().getRoot(); } @@ -93,4 +135,9 @@ public class Planner { return physicalPlan; } + + @Override + public boolean isBlockQuery() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 23b697ede3..6b83a63f73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.DorisParser.JoinCriteriaContext; import org.apache.doris.nereids.DorisParser.JoinRelationContext; import org.apache.doris.nereids.DorisParser.LogicalBinaryContext; import org.apache.doris.nereids.DorisParser.LogicalNotContext; +import org.apache.doris.nereids.DorisParser.MultiStatementsContext; import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext; import org.apache.doris.nereids.DorisParser.NamedExpressionContext; import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext; @@ -52,6 +53,7 @@ import org.apache.doris.nereids.DorisParser.SelectClauseContext; import org.apache.doris.nereids.DorisParser.SingleStatementContext; import org.apache.doris.nereids.DorisParser.SortItemContext; import org.apache.doris.nereids.DorisParser.StarContext; +import org.apache.doris.nereids.DorisParser.StatementContext; import org.apache.doris.nereids.DorisParser.StringLiteralContext; import org.apache.doris.nereids.DorisParser.TableNameContext; import org.apache.doris.nereids.DorisParser.WhereClauseContext; @@ -147,6 +149,18 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { return ParserUtils.withOrigin(ctx, f); } + /** + * Visit multi-statements. + */ + public Object visitMultiStatements(MultiStatementsContext ctx) { + List logicalPlanList = new ArrayList<>(); + for (StatementContext stmtCtx : ctx.statement()) { + LogicalPlan logicalPlan = (LogicalPlan) visit(stmtCtx); + logicalPlanList.add(logicalPlan); + } + return logicalPlanList; + } + /* ******************************************************************************************** * Plan parsing * ******************************************************************************************** */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java new file mode 100644 index 0000000000..5f294cafa4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.parser; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.nereids.DorisLexer; +import org.apache.doris.nereids.DorisParser; +import org.apache.doris.nereids.exceptions.ParsingException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter; + +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.atn.PredictionMode; +import org.antlr.v4.runtime.misc.ParseCancellationException; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** + * Sql parser, convert sql DSL to logical plan. + */ +public class NereidsParser { + + /** + * In MySQL protocol, client could send multi-statement in. + * a single packet. + * https://dev.mysql.com/doc/internals/en/com-set-option.html + */ + public List parseSQL(String originStr) throws Exception { + List logicalPlanList = parseMultiple(originStr); + List statementBaseList = new ArrayList<>(); + for (LogicalPlan logicalPlan : logicalPlanList) { + LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalPlan); + statementBaseList.add(logicalPlanAdapter); + } + return statementBaseList; + } + + /** + * parse sql DSL string. + * + * @param sql sql string + * @return logical plan + */ + public LogicalPlan parseSingle(String sql) throws Exception { + return (LogicalPlan) parse(sql, DorisParser::singleStatement); + } + + public List parseMultiple(String sql) throws Exception { + return (List) parse(sql, DorisParser::multiStatements); + } + + private Object parse(String sql, Function parseFunction) { + try { + ParserRuleContext tree = toAst(sql, parseFunction); + LogicalPlanBuilder logicalPlanBuilder = new LogicalPlanBuilder(); + return logicalPlanBuilder.visit(tree); + } catch (StackOverflowError e) { + throw new ParsingException(e.getMessage()); + } + } + + private ParserRuleContext toAst(String sql, Function parseFunction) { + DorisLexer lexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql))); + CommonTokenStream tokenStream = new CommonTokenStream(lexer); + DorisParser parser = new DorisParser(tokenStream); + // parser.addParseListener(PostProcessor) + // parser.removeErrorListeners() + // parser.addErrorListener(ParseErrorListener) + ParserRuleContext tree; + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter().setPredictionMode(PredictionMode.SLL); + tree = parseFunction.apply(parser); + } catch (ParseCancellationException ex) { + // if we fail, parse with LL mode + tokenStream.seek(0); // rewind input stream + parser.reset(); + + parser.getInterpreter().setPredictionMode(PredictionMode.LL); + tree = parseFunction.apply(parser); + } + return tree; + } + + public Expression createExpression(String expression) { + return (Expression) parse(expression, DorisParser::expression); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/SqlParser.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/SqlParser.java deleted file mode 100644 index b46633c46a..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/SqlParser.java +++ /dev/null @@ -1,85 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.parser; - -import org.apache.doris.nereids.DorisLexer; -import org.apache.doris.nereids.DorisParser; -import org.apache.doris.nereids.exceptions.ParsingException; -import org.apache.doris.nereids.trees.TreeNode; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; - -import org.antlr.v4.runtime.CharStreams; -import org.antlr.v4.runtime.CommonTokenStream; -import org.antlr.v4.runtime.ParserRuleContext; -import org.antlr.v4.runtime.atn.PredictionMode; -import org.antlr.v4.runtime.misc.ParseCancellationException; - -import java.util.function.Function; - -/** - * Sql parser, convert sql DSL to logical plan. - */ -public class SqlParser { - - /** - * parse sql DSL string. - * - * @param sql sql string - * @return logical plan - */ - public LogicalPlan parse(String sql) { - return (LogicalPlan) parse(sql, DorisParser::singleStatement); - } - - private TreeNode parse(String sql, Function parseFunction) { - try { - DorisLexer lexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql))); - CommonTokenStream tokenStream = new CommonTokenStream(lexer); - DorisParser parser = new DorisParser(tokenStream); - - // parser.addParseListener(PostProcessor) - // parser.removeErrorListeners() - // parser.addErrorListener(ParseErrorListener) - - ParserRuleContext tree; - try { - // first, try parsing with potentially faster SLL mode - parser.getInterpreter().setPredictionMode(PredictionMode.SLL); - tree = parseFunction.apply(parser); - } catch (ParseCancellationException ex) { - // if we fail, parse with LL mode - tokenStream.seek(0); // rewind input stream - parser.reset(); - - parser.getInterpreter().setPredictionMode(PredictionMode.LL); - tree = parseFunction.apply(parser); - } - - LogicalPlanBuilder logicalPlanBuilder = new LogicalPlanBuilder(); - return (TreeNode) logicalPlanBuilder.visit(tree); - - } catch (StackOverflowError e) { - throw new ParsingException(e.getMessage()); - } - } - - public Expression createExpression(String expression) { - return (Expression) parse(expression, DorisParser::expression); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index 88ae78aa87..50899b7f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -23,6 +23,8 @@ package org.apache.doris.nereids.properties; public class PhysicalProperties { private DistributionSpec distributionDesc; + public PhysicalProperties() {} + public DistributionSpec getDistributionDesc() { return distributionDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/qe/Executor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/qe/Executor.java deleted file mode 100644 index 342585f932..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/qe/Executor.java +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.qe; - -import org.apache.doris.nereids.parser.SqlParser; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.qe.ConnectContext; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * Temporary executor in Nereids. - */ -public class Executor { - private static final Logger LOG = LogManager.getLogger(Executor.class); - - private final String sql; - private final ConnectContext context; - - public Executor(String sql, ConnectContext context) { - this.sql = sql; - this.context = context; - } - - public void dryRun() throws Exception { - doExecute(false); - } - - public void execute() throws Exception { - doExecute(true); - } - - private void doExecute(boolean sendFragments) throws Exception { - LOG.info("==== input SQL: ====\n{}", sql); - System.out.println("==== input SQL: ====\n" + sql + "\n"); - - // parse phase - SqlParser parser = new SqlParser(); - LogicalPlan parsedPlan = parser.parse(sql); - LOG.info("==== parsed plan: ====\n{}", parsedPlan.treeString()); - System.out.println("==== parsed plan: ====\n" + parsedPlan.treeString() + "\n"); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java index 4544ea7bd7..cf2fb9e637 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java @@ -77,9 +77,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor planFragmentList = Lists.newArrayList(); + private final List planFragmentList = Lists.newArrayList(); - private DescriptorTable descTable = new DescriptorTable(); + private final DescriptorTable descTable = new DescriptorTable(); + private final List scanNodeList = new ArrayList<>(); private final IdGenerator fragmentIdGenerator = PlanFragmentId.createGenerator(); @@ -70,4 +73,11 @@ public class PlanContext { this.planFragmentList.add(planFragment); } + public void addScanNode(ScanNode scanNode) { + scanNodeList.add(scanNode); + } + + public List getScanNodeList() { + return scanNodeList; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPlanAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPlanAdapter.java new file mode 100644 index 0000000000..419fb2ba4f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPlanAdapter.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.analysis.RedirectStatus; +import org.apache.doris.analysis.StatementBase; + +/** + * This class is used for the compatibility and code reuse in. + * @see org.apache.doris.qe.ConnectProcessor + */ +public class LogicalPlanAdapter extends StatementBase { + + private final LogicalPlan logicalPlan; + + public LogicalPlanAdapter(LogicalPlan logicalPlan) { + this.logicalPlan = logicalPlan; + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } + + public LogicalPlan getLogicalPlan() { + return logicalPlan; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java new file mode 100644 index 0000000000..16066544e7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Planner.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InsertStmt; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; +import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TRuntimeFilterMode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * The planner is responsible for turning parse trees into plan fragments that can be shipped off to backends for + * execution. + */ +public class OriginalPlanner extends Planner { + private static final Logger LOG = LogManager.getLogger(OriginalPlanner.class); + + private PlannerContext plannerContext; + private SingleNodePlanner singleNodePlanner; + private DistributedPlanner distributedPlanner; + private Analyzer analyzer; + + public OriginalPlanner(Analyzer analyzer) { + this.analyzer = analyzer; + } + + public boolean isBlockQuery() { + return isBlockQuery; + } + + public PlannerContext getPlannerContext() { + return plannerContext; + } + + public List getScanNodes() { + if (singleNodePlanner == null) { + return Lists.newArrayList(); + } + return singleNodePlanner.getScanNodes(); + } + + public void plan(StatementBase queryStmt, TQueryOptions queryOptions) + throws UserException { + createPlanFragments(queryStmt, analyzer, queryOptions); + } + + /** + */ + private void setResultExprScale(Analyzer analyzer, ArrayList outputExprs) { + for (TupleDescriptor tupleDesc : analyzer.getDescTbl().getTupleDescs()) { + for (SlotDescriptor slotDesc : tupleDesc.getSlots()) { + for (Expr expr : outputExprs) { + List slotList = Lists.newArrayList(); + expr.getIds(null, slotList); + if (PrimitiveType.DECIMALV2 != expr.getType().getPrimitiveType()) { + continue; + } + + if (PrimitiveType.DECIMALV2 != slotDesc.getType().getPrimitiveType()) { + continue; + } + + if (slotList.contains(slotDesc.getId()) && null != slotDesc.getColumn()) { + int outputScale = slotDesc.getColumn().getScale(); + if (outputScale >= 0) { + if (outputScale > expr.getOutputScale()) { + expr.setOutputScale(outputScale); + } + } + } + } + } + } + } + + /** + * Return combined explain string for all plan fragments. + */ + @Override + public void appendTupleInfo(StringBuilder str) { + str.append(plannerContext.getRootAnalyzer().getDescTbl().getExplainString()); + } + + /** + * Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in + * a list such that element i of that list can only consume output of the following fragments j > i. + */ + public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions) + throws UserException { + QueryStmt queryStmt; + if (statement instanceof InsertStmt) { + queryStmt = ((InsertStmt) statement).getQueryStmt(); + } else { + queryStmt = (QueryStmt) statement; + } + + plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement); + singleNodePlanner = new SingleNodePlanner(plannerContext); + PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); + + if (VectorizedUtil.isVectorized()) { + singleNodePlan.convertToVectoriezd(); + } + + if (analyzer.getContext() != null + && analyzer.getContext().getSessionVariable().isEnableProjection() + && statement instanceof SelectStmt) { + ProjectPlanner projectPlanner = new ProjectPlanner(analyzer); + projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan); + } + + if (statement instanceof InsertStmt) { + InsertStmt insertStmt = (InsertStmt) statement; + insertStmt.prepareExpressions(); + } + + // TODO chenhao16 , no used materialization work + // compute referenced slots before calling computeMemLayout() + //analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null); + + setResultExprScale(analyzer, queryStmt.getResultExprs()); + + // materialized view selector + boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer); + if (selectFailed) { + throw new MVSelectFailedException("Failed to select materialize view"); + } + + /** + * - Under normal circumstances, computeMemLayout() will be executed + * at the end of the init function of the plan node. + * Such as : + * OlapScanNode { + * init () { + * analyzer.materializeSlots(conjuncts); + * computeTupleStatAndMemLayout(analyzer); + * computeStat(); + * } + * } + * - However Doris is currently unable to determine + * whether it is possible to cut or increase the columns in the tuple after PlanNode.init(). + * - Therefore, for the time being, computeMemLayout() can only be placed + * after the completion of the entire single node planner. + */ + analyzer.getDescTbl().computeMemLayout(); + singleNodePlan.finalize(analyzer); + + if (queryOptions.num_nodes == 1) { + // single-node execution; we're almost done + singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan); + fragments.add(new PlanFragment(plannerContext.getNextFragmentId(), singleNodePlan, + DataPartition.UNPARTITIONED)); + } else { + // all select query are unpartitioned. + distributedPlanner = new DistributedPlanner(plannerContext); + fragments = distributedPlanner.createPlanFragments(singleNodePlan); + } + + // Optimize the transfer of query statistic when query doesn't contain limit. + PlanFragment rootFragment = fragments.get(fragments.size() - 1); + QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer + = new QueryStatisticsTransferOptimizer(rootFragment); + queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer(); + + // Create runtime filters. + if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase() + .equals(TRuntimeFilterMode.OFF.name())) { + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot()); + } + + if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) { + InsertStmt insertStmt = (InsertStmt) statement; + rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); + rootFragment.setSink(insertStmt.getDataSink()); + insertStmt.complete(); + ArrayList exprs = ((InsertStmt) statement).getResultExprs(); + List resExprs = Expr.substituteList( + exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true); + rootFragment.setOutputExprs(resExprs); + } else { + List resExprs = Expr.substituteList(queryStmt.getResultExprs(), + rootFragment.getPlanRoot().getOutputSmap(), analyzer, false); + rootFragment.setOutputExprs(resExprs); + } + LOG.debug("finalize plan fragments"); + for (PlanFragment fragment : fragments) { + fragment.finalize(queryStmt); + } + + Collections.reverse(fragments); + + pushDownResultFileSink(analyzer); + + if (queryStmt instanceof SelectStmt) { + SelectStmt selectStmt = (SelectStmt) queryStmt; + if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { + isBlockQuery = true; + LOG.debug("this is block query"); + } else { + isBlockQuery = false; + LOG.debug("this isn't block query"); + } + } + } + + /** + * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise + * returns root unchanged. + */ + private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) + throws UserException { + Preconditions.checkNotNull(root); + // List conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds()); + + List conjuncts = analyzer.getUnassignedConjuncts(root); + if (conjuncts.isEmpty()) { + return root; + } + // evaluate conjuncts in SelectNode + SelectNode selectNode = new SelectNode(plannerContext.getNextNodeId(), root, conjuncts); + selectNode.init(analyzer); + Preconditions.checkState(selectNode.hasValidStats()); + return selectNode; + } + + /** + * This function is mainly used to try to push the top-level result file sink down one layer. + * The result file sink after the pushdown can realize the function of concurrently exporting the result set. + * Push down needs to meet the following conditions: + * 1. The query enables the session variable of the concurrent export result set + * 2. The top-level fragment is not a merge change node + * 3. The export method uses the s3 method + * + * After satisfying the above three conditions, + * the result file sink and the associated output expr will be pushed down to the next layer. + * The second plan fragment performs expression calculation and derives the result set. + * The top plan fragment will only summarize the status of the exported result set and return it to fe. + */ + private void pushDownResultFileSink(Analyzer analyzer) { + if (fragments.size() < 1) { + return; + } + if (!(fragments.get(0).getSink() instanceof ResultFileSink)) { + return; + } + if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) { + return; + } + if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) { + return; + } + PlanFragment topPlanFragment = fragments.get(0); + ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot(); + // try to push down result file sink + if (topPlanNode.isMergingExchange()) { + return; + } + PlanFragment secondPlanFragment = fragments.get(1); + ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink(); + if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) { + return; + } + if (secondPlanFragment.getOutputExprs() != null) { + return; + } + // create result file sink desc + TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer); + resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink()); + resultFileSink.setOutputTupleId(fileStatusDesc.getId()); + secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs()); + secondPlanFragment.resetSink(resultFileSink); + ResultSink resultSink = new ResultSink(topPlanNode.getId()); + topPlanFragment.resetSink(resultSink); + topPlanFragment.resetOutputExprs(fileStatusDesc); + topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId())); + } + + /** + * Construct a tuple for file status, the tuple schema as following: + * | FileNumber | Int | + * | TotalRows | Bigint | + * | FileSize | Bigint | + * | URL | Varchar | + */ + private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) { + TupleDescriptor resultFileStatusTupleDesc = + analyzer.getDescTbl().createTupleDescriptor("result_file_status"); + resultFileStatusTupleDesc.setIsMaterialized(true); + SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); + fileNumber.setLabel("FileNumber"); + fileNumber.setType(ScalarType.createType(PrimitiveType.INT)); + fileNumber.setIsMaterialized(true); + fileNumber.setIsNullable(false); + SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); + totalRows.setLabel("TotalRows"); + totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT)); + totalRows.setIsMaterialized(true); + totalRows.setIsNullable(false); + SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); + fileSize.setLabel("FileSize"); + fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT)); + fileSize.setIsMaterialized(true); + fileSize.setIsNullable(false); + SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); + url.setLabel("URL"); + url.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + url.setIsMaterialized(true); + url.setIsNullable(false); + resultFileStatusTupleDesc.computeStatAndMemLayout(); + return resultFileStatusTupleDesc; + } + + private static class QueryStatisticsTransferOptimizer { + private final PlanFragment root; + + public QueryStatisticsTransferOptimizer(PlanFragment root) { + Preconditions.checkNotNull(root); + this.root = root; + } + + public void optimizeQueryStatisticsTransfer() { + optimizeQueryStatisticsTransfer(root, null); + } + + private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) { + if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) { + fragment.setTransferQueryStatisticsWithEveryBatch(true); + } + for (PlanFragment child : fragment.getChildren()) { + optimizeQueryStatisticsTransfer(child, fragment); + } + } + + // Check whether leaf node contains limit. + private boolean hasLimit(PlanNode ancestor, PlanNode successor) { + final List exchangeNodes = Lists.newArrayList(); + collectExchangeNode(ancestor, exchangeNodes); + for (PlanNode leaf : exchangeNodes) { + if (leaf.getChild(0) == successor + && leaf.hasLimit()) { + return true; + } + } + return false; + } + + private void collectExchangeNode(PlanNode planNode, List exchangeNodes) { + if (planNode instanceof ExchangeNode) { + exchangeNodes.add(planNode); + } + + for (PlanNode child : planNode.getChildren()) { + if (child instanceof ExchangeNode) { + exchangeNodes.add(child); + } else { + collectExchangeNode(child, exchangeNodes); + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index c563ae126c..f21a988b0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -6,7 +6,7 @@ // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an @@ -14,34 +14,15 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Planner.java -// and modified by Doris package org.apache.doris.planner; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ExplainOptions; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.InsertStmt; -import org.apache.doris.analysis.QueryStmt; -import org.apache.doris.analysis.SelectStmt; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.StatementBase; -import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.PlanTreeBuilder; import org.apache.doris.common.profile.PlanTreePrinter; -import org.apache.doris.common.util.VectorizedUtil; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; -import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TQueryOptions; -import org.apache.doris.thrift.TRuntimeFilterMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -49,81 +30,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -/** - * The planner is responsible for turning parse trees into plan fragments that can be shipped off to backends for - * execution. - */ -public class Planner { - private static final Logger LOG = LogManager.getLogger(Planner.class); +public abstract class Planner { - private boolean isBlockQuery = false; + private static final Logger LOG = LogManager.getLogger(Planner.class); protected ArrayList fragments = Lists.newArrayList(); - private PlannerContext plannerContext; - private SingleNodePlanner singleNodePlanner; - private DistributedPlanner distributedPlanner; + protected boolean isBlockQuery = false; - public boolean isBlockQuery() { - return isBlockQuery; - } + public abstract List getScanNodes(); - public List getFragments() { - return fragments; - } + public abstract void plan(StatementBase queryStmt, + TQueryOptions queryOptions) throws UserException; - public PlannerContext getPlannerContext() { - return plannerContext; - } - - public List getScanNodes() { - if (singleNodePlanner == null) { - return Lists.newArrayList(); - } - return singleNodePlanner.getScanNodes(); - } - - public void plan(StatementBase queryStmt, Analyzer analyzer, TQueryOptions queryOptions) - throws UserException { - createPlanFragments(queryStmt, analyzer, queryOptions); - } - - /** - */ - private void setResultExprScale(Analyzer analyzer, ArrayList outputExprs) { - for (TupleDescriptor tupleDesc : analyzer.getDescTbl().getTupleDescs()) { - for (SlotDescriptor slotDesc : tupleDesc.getSlots()) { - for (Expr expr : outputExprs) { - List slotList = Lists.newArrayList(); - expr.getIds(null, slotList); - if (PrimitiveType.DECIMALV2 != expr.getType().getPrimitiveType()) { - continue; - } - - if (PrimitiveType.DECIMALV2 != slotDesc.getType().getPrimitiveType()) { - continue; - } - - if (slotList.contains(slotDesc.getId()) && null != slotDesc.getColumn()) { - int outputScale = slotDesc.getColumn().getScale(); - if (outputScale >= 0) { - if (outputScale > expr.getOutputScale()) { - expr.setOutputScale(outputScale); - } - } - } - } - } - } - } - - /** - * Return combined explain string for all plan fragments. - */ - public String getExplainString(List fragments, ExplainOptions explainOptions) { + public String getExplainString(ExplainOptions explainOptions) { Preconditions.checkNotNull(explainOptions); if (explainOptions.isGraph()) { // print the plan graph @@ -138,7 +60,9 @@ public class Planner { } // print text plan - TExplainLevel explainLevel = explainOptions.isVerbose() ? TExplainLevel.VERBOSE : TExplainLevel.NORMAL; + org.apache.doris.thrift.TExplainLevel + explainLevel = explainOptions.isVerbose() + ? org.apache.doris.thrift.TExplainLevel.VERBOSE : org.apache.doris.thrift.TExplainLevel.NORMAL; StringBuilder str = new StringBuilder(); for (int i = 0; i < fragments.size(); ++i) { PlanFragment fragment = fragments.get(i); @@ -149,287 +73,20 @@ public class Planner { str.append("PLAN FRAGMENT " + i + "\n"); str.append(fragment.getExplainString(explainLevel)); } - if (explainLevel == TExplainLevel.VERBOSE) { - str.append(plannerContext.getRootAnalyzer().getDescTbl().getExplainString()); + if (explainLevel == org.apache.doris.thrift.TExplainLevel.VERBOSE) { + appendTupleInfo(str); } return str.toString(); } - /** - * Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in - * a list such that element i of that list can only consume output of the following fragments j > i. - */ - public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions) - throws UserException { - QueryStmt queryStmt; - if (statement instanceof InsertStmt) { - queryStmt = ((InsertStmt) statement).getQueryStmt(); - } else { - queryStmt = (QueryStmt) statement; - } + public void appendTupleInfo(StringBuilder stringBuilder) {} - plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement); - singleNodePlanner = new SingleNodePlanner(plannerContext); - PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); - - if (VectorizedUtil.isVectorized()) { - singleNodePlan.convertToVectoriezd(); - } - - if (analyzer.getContext() != null - && analyzer.getContext().getSessionVariable().isEnableProjection() - && statement instanceof SelectStmt) { - ProjectPlanner projectPlanner = new ProjectPlanner(analyzer); - projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan); - } - - if (statement instanceof InsertStmt) { - InsertStmt insertStmt = (InsertStmt) statement; - insertStmt.prepareExpressions(); - } - - // TODO chenhao16 , no used materialization work - // compute referenced slots before calling computeMemLayout() - //analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null); - - setResultExprScale(analyzer, queryStmt.getResultExprs()); - - // materialized view selector - boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer); - if (selectFailed) { - throw new MVSelectFailedException("Failed to select materialize view"); - } - - /** - * - Under normal circumstances, computeMemLayout() will be executed - * at the end of the init function of the plan node. - * Such as : - * OlapScanNode { - * init () { - * analyzer.materializeSlots(conjuncts); - * computeTupleStatAndMemLayout(analyzer); - * computeStat(); - * } - * } - * - However Doris is currently unable to determine - * whether it is possible to cut or increase the columns in the tuple after PlanNode.init(). - * - Therefore, for the time being, computeMemLayout() can only be placed - * after the completion of the entire single node planner. - */ - analyzer.getDescTbl().computeMemLayout(); - singleNodePlan.finalize(analyzer); - - if (queryOptions.num_nodes == 1) { - // single-node execution; we're almost done - singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan); - fragments.add(new PlanFragment(plannerContext.getNextFragmentId(), singleNodePlan, - DataPartition.UNPARTITIONED)); - } else { - // all select query are unpartitioned. - distributedPlanner = new DistributedPlanner(plannerContext); - fragments = distributedPlanner.createPlanFragments(singleNodePlan); - } - - // Optimize the transfer of query statistic when query doesn't contain limit. - PlanFragment rootFragment = fragments.get(fragments.size() - 1); - QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer - = new QueryStatisticsTransferOptimizer(rootFragment); - queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer(); - - // Create runtime filters. - if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase() - .equals(TRuntimeFilterMode.OFF.name())) { - RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot()); - } - - if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) { - InsertStmt insertStmt = (InsertStmt) statement; - rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); - rootFragment.setSink(insertStmt.getDataSink()); - insertStmt.complete(); - ArrayList exprs = ((InsertStmt) statement).getResultExprs(); - List resExprs = Expr.substituteList( - exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true); - rootFragment.setOutputExprs(resExprs); - } else { - List resExprs = Expr.substituteList(queryStmt.getResultExprs(), - rootFragment.getPlanRoot().getOutputSmap(), analyzer, false); - rootFragment.setOutputExprs(resExprs); - } - LOG.debug("finalize plan fragments"); - for (PlanFragment fragment : fragments) { - fragment.finalize(queryStmt); - } - - Collections.reverse(fragments); - - pushDownResultFileSink(analyzer); - - if (queryStmt instanceof SelectStmt) { - SelectStmt selectStmt = (SelectStmt) queryStmt; - if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { - isBlockQuery = true; - LOG.debug("this is block query"); - } else { - isBlockQuery = false; - LOG.debug("this isn't block query"); - } - } + public List getFragments() { + return fragments; } - /** - * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise - * returns root unchanged. - */ - private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) - throws UserException { - Preconditions.checkNotNull(root); - // List conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds()); - - List conjuncts = analyzer.getUnassignedConjuncts(root); - if (conjuncts.isEmpty()) { - return root; - } - // evaluate conjuncts in SelectNode - SelectNode selectNode = new SelectNode(plannerContext.getNextNodeId(), root, conjuncts); - selectNode.init(analyzer); - Preconditions.checkState(selectNode.hasValidStats()); - return selectNode; + public boolean isBlockQuery() { + return isBlockQuery; } - /** - * This function is mainly used to try to push the top-level result file sink down one layer. - * The result file sink after the pushdown can realize the function of concurrently exporting the result set. - * Push down needs to meet the following conditions: - * 1. The query enables the session variable of the concurrent export result set - * 2. The top-level fragment is not a merge change node - * 3. The export method uses the s3 method - * - * After satisfying the above three conditions, - * the result file sink and the associated output expr will be pushed down to the next layer. - * The second plan fragment performs expression calculation and derives the result set. - * The top plan fragment will only summarize the status of the exported result set and return it to fe. - */ - private void pushDownResultFileSink(Analyzer analyzer) { - if (fragments.size() < 1) { - return; - } - if (!(fragments.get(0).getSink() instanceof ResultFileSink)) { - return; - } - if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) { - return; - } - if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) { - return; - } - PlanFragment topPlanFragment = fragments.get(0); - ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot(); - // try to push down result file sink - if (topPlanNode.isMergingExchange()) { - return; - } - PlanFragment secondPlanFragment = fragments.get(1); - ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink(); - if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) { - return; - } - if (secondPlanFragment.getOutputExprs() != null) { - return; - } - // create result file sink desc - TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer); - resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink()); - resultFileSink.setOutputTupleId(fileStatusDesc.getId()); - secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs()); - secondPlanFragment.resetSink(resultFileSink); - ResultSink resultSink = new ResultSink(topPlanNode.getId()); - topPlanFragment.resetSink(resultSink); - topPlanFragment.resetOutputExprs(fileStatusDesc); - topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId())); - } - - /** - * Construct a tuple for file status, the tuple schema as following: - * | FileNumber | Int | - * | TotalRows | Bigint | - * | FileSize | Bigint | - * | URL | Varchar | - */ - private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) { - TupleDescriptor resultFileStatusTupleDesc = - analyzer.getDescTbl().createTupleDescriptor("result_file_status"); - resultFileStatusTupleDesc.setIsMaterialized(true); - SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); - fileNumber.setLabel("FileNumber"); - fileNumber.setType(ScalarType.createType(PrimitiveType.INT)); - fileNumber.setIsMaterialized(true); - fileNumber.setIsNullable(false); - SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); - totalRows.setLabel("TotalRows"); - totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT)); - totalRows.setIsMaterialized(true); - totalRows.setIsNullable(false); - SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); - fileSize.setLabel("FileSize"); - fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT)); - fileSize.setIsMaterialized(true); - fileSize.setIsNullable(false); - SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc); - url.setLabel("URL"); - url.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - url.setIsMaterialized(true); - url.setIsNullable(false); - resultFileStatusTupleDesc.computeStatAndMemLayout(); - return resultFileStatusTupleDesc; - } - - private static class QueryStatisticsTransferOptimizer { - private final PlanFragment root; - - public QueryStatisticsTransferOptimizer(PlanFragment root) { - Preconditions.checkNotNull(root); - this.root = root; - } - - public void optimizeQueryStatisticsTransfer() { - optimizeQueryStatisticsTransfer(root, null); - } - - private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) { - if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) { - fragment.setTransferQueryStatisticsWithEveryBatch(true); - } - for (PlanFragment child : fragment.getChildren()) { - optimizeQueryStatisticsTransfer(child, fragment); - } - } - - // Check whether leaf node contains limit. - private boolean hasLimit(PlanNode ancestor, PlanNode successor) { - final List exchangeNodes = Lists.newArrayList(); - collectExchangeNode(ancestor, exchangeNodes); - for (PlanNode leaf : exchangeNodes) { - if (leaf.getChild(0) == successor - && leaf.hasLimit()) { - return true; - } - } - return false; - } - - private void collectExchangeNode(PlanNode planNode, List exchangeNodes) { - if (planNode instanceof ExchangeNode) { - exchangeNodes.add(planNode); - } - - for (PlanNode child : planNode.getChildren()) { - if (child instanceof ExchangeNode) { - exchangeNodes.add(child); - } else { - collectExchangeNode(child, exchangeNodes); - } - } - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 512b8c61f3..c83f5eeb45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlPacket; import org.apache.doris.mysql.MysqlProto; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.MysqlServerStatusFlag; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.proto.Data; import org.apache.doris.service.FrontendOptions; @@ -201,7 +202,18 @@ public class ConnectProcessor { List> auditInfoList = Lists.newArrayList(); boolean alreadyAddedToAuditInfoList = false; try { - List stmts = analyze(originStmt); + List stmts = null; + if (ctx.getSessionVariable().isEnableNereids()) { + NereidsParser nereidsParser = new NereidsParser(); + try { + stmts = nereidsParser.parseSQL(originStmt); + } catch (Exception e) { + LOG.warn("SQL : {}, parse failed by new parser", originStmt, e); + } + } + if (stmts == null) { + stmts = analyze(originStmt); + } for (int i = 0; i < stmts.size(); ++i) { alreadyAddedToAuditInfoList = false; ctx.getState().reset(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 384401e5b3..e7cdfd37f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -192,6 +192,8 @@ public class SessionVariable implements Serializable, Writable { static final String ENABLE_ARRAY_TYPE = "enable_array_type"; + public static final String ENABLE_NEREIDS = "enable_nereids"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -471,6 +473,15 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_ARRAY_TYPE) private boolean enableArrayType = false; + /** + * as the new optimizer is not mature yet, use this var + * to control whether to use new optimizer, remove it when + * the new optimizer is fully developed. I hope that day + * would be coming soon. + */ + @VariableMgr.VarAttr(name = ENABLE_NEREIDS) + private boolean enableNereids = false; + public String getBlockEncryptionMode() { return blockEncryptionMode; } @@ -970,6 +981,14 @@ public class SessionVariable implements Serializable, Writable { this.enableArrayType = enableArrayType; } + public boolean isEnableNereids() { + return enableNereids; + } + + public void setEnableNereids(boolean enableNereids) { + this.enableNereids = enableNereids; + } + /** * Serialize to thrift object. * Used for rest api. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ba4ab0d200..edbc629be0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -84,7 +84,10 @@ import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlEofPacket; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; @@ -729,10 +732,14 @@ public class StmtExecutor implements ProfileWriter { } plannerProfile.setQueryAnalysisFinishTime(); - // create plan - planner = new Planner(); + if (parsedStmt instanceof LogicalPlanAdapter) { + // create plan + planner = new NereidsPlanner(context); + } else { + planner = new OriginalPlanner(analyzer); + } if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { - planner.plan(parsedStmt, analyzer, tQueryOptions); + planner.plan(parsedStmt, tQueryOptions); } // TODO(zc): // Preconditions.checkState(!analyzer.hasUnassignedConjuncts()); @@ -874,8 +881,12 @@ public class StmtExecutor implements ProfileWriter { newSelectStmt.reset(); analyzer = new Analyzer(context.getCatalog(), context); newSelectStmt.analyze(analyzer); - planner = new Planner(); - planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift()); + if (parsedStmt instanceof LogicalPlanAdapter) { + planner = new NereidsPlanner(context); + } else { + planner = new OriginalPlanner(analyzer); + } + planner.plan(newSelectStmt, context.getSessionVariable().toThrift()); } } sendResult(false, isSendFields, newSelectStmt, channel, cacheAnalyzer, cacheResult); @@ -929,7 +940,7 @@ public class StmtExecutor implements ProfileWriter { } if (queryStmt.isExplain()) { - String explainString = planner.getExplainString(planner.getFragments(), queryStmt.getExplainOptions()); + String explainString = planner.getExplainString(queryStmt.getExplainOptions()); handleExplainStmt(explainString); return; } @@ -1245,7 +1256,7 @@ public class StmtExecutor implements ProfileWriter { if (insertStmt.getQueryStmt().isExplain()) { ExplainOptions explainOptions = insertStmt.getQueryStmt().getExplainOptions(); insertStmt.setIsExplain(explainOptions); - String explainString = planner.getExplainString(planner.getFragments(), explainOptions); + String explainString = planner.getExplainString(explainOptions); handleExplainStmt(explainString); return; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index a1d35a5b1b..5577a566be 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -20,7 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.Util; -import org.apache.doris.planner.Planner; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.VariableMgr; import org.apache.doris.utframe.DorisAssert; @@ -562,28 +562,28 @@ public class SelectStmtTest { @Test public void testSelectHintSetVar() throws Exception { String sql = "SELECT sleep(3);"; - Planner planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan(); + OriginalPlanner planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan(); Assert.assertEquals(VariableMgr.getDefaultSessionVariable().getQueryTimeoutS(), planner.getPlannerContext().getQueryOptions().query_timeout); sql = "SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3);"; - planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan(); + planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan(); Assert.assertEquals(1, planner.getPlannerContext().getQueryOptions().query_timeout); sql = "select * from db1.partition_table where datekey=20200726"; - planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan(); + planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan(); Assert.assertEquals(VariableMgr.getDefaultSessionVariable().getMaxExecMemByte(), planner.getPlannerContext().getQueryOptions().mem_limit); sql = "select /*+ SET_VAR(exec_mem_limit = 8589934592) */ poi_id, count(*) from db1.partition_table " + "where datekey=20200726 group by 1"; - planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan(); + planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan(); Assert.assertEquals(8589934592L, planner.getPlannerContext().getQueryOptions().mem_limit); int queryTimeOut = dorisAssert.getSessionVariable().getQueryTimeoutS(); long execMemLimit = dorisAssert.getSessionVariable().getMaxExecMemByte(); sql = "select /*+ SET_VAR(exec_mem_limit = 8589934592, query_timeout = 1) */ 1 + 2;"; - planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan(); + planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan(); // session variable have been changed Assert.assertEquals(1, planner.getPlannerContext().getQueryOptions().query_timeout); Assert.assertEquals(8589934592L, planner.getPlannerContext().getQueryOptions().mem_limit); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/AnalyzeTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/AnalyzeTest.java index 2832dfa115..f4a472fe7a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/AnalyzeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/AnalyzeTest.java @@ -20,7 +20,7 @@ package org.apache.doris.nereids; import org.apache.doris.nereids.analyzer.Unbound; import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob; import org.apache.doris.nereids.memo.Memo; -import org.apache.doris.nereids.parser.SqlParser; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.analysis.BindRelation; import org.apache.doris.nereids.rules.analysis.BindSlotReference; @@ -37,7 +37,7 @@ import java.util.List; public class AnalyzeTest extends TestWithFeService { - private final SqlParser parser = new SqlParser(); + private final NereidsParser parser = new NereidsParser(); @Override protected void runBeforeAll() throws Exception { @@ -67,8 +67,8 @@ public class AnalyzeTest extends TestWithFeService { Assertions.assertTrue(checkBound(analyzed)); } - private LogicalPlan analyze(String sql) { - LogicalPlan parsed = parser.parse(sql); + private LogicalPlan analyze(String sql) throws Exception { + LogicalPlan parsed = parser.parseSingle(sql); return analyze(parsed, connectContext); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java new file mode 100644 index 0000000000..f1104e97da --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.parser; + +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; + +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class NereidsParserTest { + + @Test + public void testParseMultiple() throws Exception { + NereidsParser nereidsParser = new NereidsParser(); + String sql = "SELECT b FROM test;SELECT a FROM test;"; + List logicalPlanList = nereidsParser.parseMultiple(sql); + Assertions.assertEquals(2, logicalPlanList.size()); + } + + @Test + public void testSingle() throws Exception { + NereidsParser nereidsParser = new NereidsParser(); + String sql = "SELECT * FROM test;"; + Exception exceptionOccurred = null; + try { + nereidsParser.parseSingle(sql); + } catch (Exception e) { + exceptionOccurred = e; + e.printStackTrace(); + } + Assert.assertNull(exceptionOccurred); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rewrite/ExpressionRewriteTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rewrite/ExpressionRewriteTest.java index 46ebcef0f3..0cb2e4c2be 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rewrite/ExpressionRewriteTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rewrite/ExpressionRewriteTest.java @@ -17,7 +17,7 @@ package org.apache.doris.nereids.rules.expression.rewrite; -import org.apache.doris.nereids.parser.SqlParser; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.expression.rewrite.rules.NormalizeExpressionRule; import org.apache.doris.nereids.rules.expression.rewrite.rules.SimplifyNotExprRule; import org.apache.doris.nereids.trees.expressions.Expression; @@ -29,7 +29,7 @@ import org.junit.Test; * all expr rewrite rule test case. */ public class ExpressionRewriteTest { - private final SqlParser parser = new SqlParser(); + private static final NereidsParser PARSER = new NereidsParser(); private ExpressionRuleExecutor executor; @Test @@ -57,8 +57,8 @@ public class ExpressionRewriteTest { } private void assertRewrite(String expression, String expected) { - Expression needRewriteExpression = parser.createExpression(expression); - Expression expectedExpression = parser.createExpression(expected); + Expression needRewriteExpression = PARSER.createExpression(expression); + Expression expectedExpression = PARSER.createExpression(expected); Expression rewrittenExpression = executor.rewrite(needRewriteExpression); Assert.assertEquals(expectedExpression, rewrittenExpression); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ExpressionParserTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ExpressionParserTest.java index 987760f6bf..b868c69584 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ExpressionParserTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ExpressionParserTest.java @@ -17,16 +17,16 @@ package org.apache.doris.nereids.trees.expressions; -import org.apache.doris.nereids.parser.SqlParser; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.TreeNode; import org.junit.Test; public class ExpressionParserTest { - private static final SqlParser PARSER = new SqlParser(); + private static final NereidsParser PARSER = new NereidsParser(); private void assertSql(String sql) throws Exception { - TreeNode treeNode = PARSER.parse(sql); + TreeNode treeNode = PARSER.parseSingle(sql); System.out.println(treeNode.toString()); } @@ -50,7 +50,7 @@ public class ExpressionParserTest { @Test public void testSqlAnd() throws Exception { String sql = "select * from test1 where a > 1 and b > 1"; - TreeNode treeNode = PARSER.parse(sql); + TreeNode treeNode = PARSER.parseSingle(sql); System.out.println(treeNode); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java index c8331573e5..4f119bc4e4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java @@ -133,16 +133,14 @@ public class DistributedPlannerTest { StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)")); sql = "explain select * from db1.tbl1 join [SHUFFLE] db1.tbl2 on tbl1.k1 = tbl2.k3"; stmtExecutor = new StmtExecutor(ctx, sql); stmtExecutor.execute(); planner = stmtExecutor.planner(); - fragments = planner.getFragments(); - plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + plan = planner.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)")); } @@ -152,8 +150,7 @@ public class DistributedPlannerTest { StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)")); double originThreshold = ctx.getSessionVariable().autoBroadcastJoinThreshold; @@ -162,8 +159,7 @@ public class DistributedPlannerTest { stmtExecutor = new StmtExecutor(ctx, sql); stmtExecutor.execute(); planner = stmtExecutor.planner(); - fragments = planner.getFragments(); - plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + plan = planner.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)")); } finally { ctx.getSessionVariable().autoBroadcastJoinThreshold = originThreshold; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java index 73fe9bf24b..d52314518a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java @@ -93,8 +93,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); - List fragments1 = planner1.getFragments(); - String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(false, false)); + String plan1 = planner1.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan1, "UNION")); String sql2 = "explain select * from db1.tbl1 where k1='a' and k4=1\n" + "union distinct\n" @@ -118,8 +117,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2); stmtExecutor2.execute(); Planner planner2 = stmtExecutor2.planner(); - List fragments2 = planner2.getFragments(); - String plan2 = planner2.getExplainString(fragments2, new ExplainOptions(false, false)); + String plan2 = planner2.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(4, StringUtils.countMatches(plan2, "UNION")); // intersect @@ -134,8 +132,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor3 = new StmtExecutor(connectContext, sql3); stmtExecutor3.execute(); Planner planner3 = stmtExecutor3.planner(); - List fragments3 = planner3.getFragments(); - String plan3 = planner3.getExplainString(fragments3, new ExplainOptions(false, false)); + String plan3 = planner3.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan3, "INTERSECT")); String sql4 = "explain select * from db1.tbl1 where k1='a' and k4=1\n" + "intersect distinct\n" @@ -160,8 +157,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor4 = new StmtExecutor(connectContext, sql4); stmtExecutor4.execute(); Planner planner4 = stmtExecutor4.planner(); - List fragments4 = planner4.getFragments(); - String plan4 = planner4.getExplainString(fragments4, new ExplainOptions(false, false)); + String plan4 = planner4.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(3, StringUtils.countMatches(plan4, "INTERSECT")); // except @@ -176,8 +172,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor5 = new StmtExecutor(connectContext, sql5); stmtExecutor5.execute(); Planner planner5 = stmtExecutor5.planner(); - List fragments5 = planner5.getFragments(); - String plan5 = planner5.getExplainString(fragments5, new ExplainOptions(false, false)); + String plan5 = planner5.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan5, "EXCEPT")); String sql6 = "select * from db1.tbl1 where k1='a' and k4=1\n" @@ -191,8 +186,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor6 = new StmtExecutor(connectContext, sql6); stmtExecutor6.execute(); Planner planner6 = stmtExecutor6.planner(); - List fragments6 = planner6.getFragments(); - String plan6 = planner6.getExplainString(fragments6, new ExplainOptions(false, false)); + String plan6 = planner6.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan6, "EXCEPT")); String sql7 = "select * from db1.tbl1 where k1='a' and k4=1\n" @@ -206,8 +200,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor7 = new StmtExecutor(connectContext, sql7); stmtExecutor7.execute(); Planner planner7 = stmtExecutor7.planner(); - List fragments7 = planner7.getFragments(); - String plan7 = planner7.getExplainString(fragments7, new ExplainOptions(false, false)); + String plan7 = planner7.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan7, "EXCEPT")); // mixed @@ -222,8 +215,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor8 = new StmtExecutor(connectContext, sql8); stmtExecutor8.execute(); Planner planner8 = stmtExecutor8.planner(); - List fragments8 = planner8.getFragments(); - String plan8 = planner8.getExplainString(fragments8, new ExplainOptions(false, false)); + String plan8 = planner8.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(1, StringUtils.countMatches(plan8, "UNION")); Assert.assertEquals(1, StringUtils.countMatches(plan8, "INTERSECT")); Assert.assertEquals(1, StringUtils.countMatches(plan8, "EXCEPT")); @@ -251,8 +243,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor9 = new StmtExecutor(connectContext, sql9); stmtExecutor9.execute(); Planner planner9 = stmtExecutor9.planner(); - List fragments9 = planner9.getFragments(); - String plan9 = planner9.getExplainString(fragments9, new ExplainOptions(false, false)); + String plan9 = planner9.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(2, StringUtils.countMatches(plan9, "UNION")); Assert.assertEquals(3, StringUtils.countMatches(plan9, "INTERSECT")); Assert.assertEquals(2, StringUtils.countMatches(plan9, "EXCEPT")); @@ -362,8 +353,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); - List fragments1 = planner1.getFragments(); - String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(true, false)); + String plan1 = planner1.getExplainString(new ExplainOptions(true, false)); Assert.assertEquals(2, StringUtils.countMatches(plan1, "nullIndicatorBit=0")); } @@ -413,8 +403,7 @@ public class PlannerTest extends TestWithFeService { e.printStackTrace(); } Planner planner1 = stmtExecutor1.planner(); - List fragments1 = planner1.getFragments(); - String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(false, false)); + String plan1 = planner1.getExplainString(new ExplainOptions(false, false)); StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2); try { @@ -423,8 +412,7 @@ public class PlannerTest extends TestWithFeService { e.printStackTrace(); } Planner planner2 = stmtExecutor2.planner(); - List fragments2 = planner2.getFragments(); - String plan2 = planner2.getExplainString(fragments2, new ExplainOptions(false, false)); + String plan2 = planner2.getExplainString(new ExplainOptions(false, false)); Assert.assertEquals(plan1, plan2); }; @@ -459,8 +447,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n")); } @@ -471,8 +458,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assertions.assertFalse(plan.contains("PREDICATES:")); } @@ -483,8 +469,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1, `k2` = 1\n")); } @@ -496,8 +481,7 @@ public class PlannerTest extends TestWithFeService { StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); stmtExecutor.execute(); Planner planner = stmtExecutor.planner(); - List fragments = planner.getFragments(); - String plan = planner.getExplainString(fragments, new ExplainOptions(false, false)); + String plan = planner.getExplainString(new ExplainOptions(false, false)); Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n")); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 8a079f6390..033f8bb766 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -33,10 +33,10 @@ import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.EmptySetNode; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; @@ -63,13 +63,7 @@ import java.util.Map; import java.util.Set; public class CoordinatorTest extends Coordinator { - static Planner planner = new Planner(); - static ConnectContext context = new ConnectContext(null); - static { - context.setQueryId(new TUniqueId(1, 2)); - context.setQualifiedUser("root"); - } @Mocked static Catalog catalog; @@ -77,17 +71,25 @@ public class CoordinatorTest extends Coordinator { static EditLog editLog; @Mocked static FrontendOptions frontendOptions; + + static ConnectContext context = new ConnectContext(null); static Analyzer analyzer = new Analyzer(catalog, context); + static OriginalPlanner originalPlanner = new OriginalPlanner(analyzer); + + static { + context.setQueryId(new TUniqueId(1, 2)); + context.setQualifiedUser("root"); + } public CoordinatorTest() { - super(context, analyzer, planner); + super(context, analyzer, originalPlanner); } private static Coordinator coor; @Test public void testComputeColocateJoinInstanceParam() { - Coordinator coordinator = new Coordinator(context, analyzer, planner); + Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner); PlanFragmentId planFragmentId = new PlanFragmentId(1); int scanNodeId = 1; @@ -279,7 +281,7 @@ public class CoordinatorTest extends Coordinator { @Test public void testColocateJoinAssignment() { - Coordinator coordinator = new Coordinator(context, analyzer, planner); + Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner); PlanFragmentId planFragmentId = new PlanFragmentId(1); int scanNodeId = 1; @@ -505,7 +507,7 @@ public class CoordinatorTest extends Coordinator { @Test public void testComputeScanRangeAssignmentByScheduler() { - Coordinator coordinator = new Coordinator(context, analyzer, planner); + Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner); PlanFragmentId planFragmentId = new PlanFragmentId(1); int scanNodeId = 1; Map> fragmentIdToScanNodeIds = new HashMap<>(); @@ -589,7 +591,7 @@ public class CoordinatorTest extends Coordinator { @Test public void testGetExecHostPortForFragmentIDAndBucketSeq() { - Coordinator coordinator = new Coordinator(context, analyzer, planner); + Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner); PlanFragmentId planFragmentId = new PlanFragmentId(1); // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); @@ -712,7 +714,7 @@ public class CoordinatorTest extends Coordinator { @Test public void testComputeScanRangeAssignment() { - Coordinator coordinator = new Coordinator(context, analyzer, planner); + Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner); //TScanRangeLocations TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 5b371995de..8be9d2433b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -37,7 +37,7 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlSerializer; -import org.apache.doris.planner.Planner; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TQueryOptions; @@ -174,7 +174,7 @@ public class StmtExecutorTest { @Test public void testSelect(@Mocked QueryStmt queryStmt, @Mocked SqlParser parser, - @Mocked Planner planner, + @Mocked OriginalPlanner planner, @Mocked Coordinator coordinator) throws Exception { Catalog catalog = Catalog.getCurrentCatalog(); Deencapsulation.setField(catalog, "canRead", new AtomicBoolean(true)); @@ -211,7 +211,7 @@ public class StmtExecutorTest { minTimes = 0; result = symbol; - planner.plan((QueryStmt) any, (Analyzer) any, (TQueryOptions) any); + planner.plan((QueryStmt) any, (TQueryOptions) any); minTimes = 0; // mock coordinator diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java index 1d74e49471..398f6112ef 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java @@ -204,7 +204,7 @@ public class DorisAssert { } } Planner planner = stmtExecutor.planner(); - String explainString = planner.getExplainString(planner.getFragments(), new ExplainOptions(false, false)); + String explainString = planner.getExplainString(new ExplainOptions(false, false)); System.out.println(explainString); return explainString; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 649043e9ac..feed9b0a58 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -319,7 +319,7 @@ public abstract class TestWithFeService { stmtExecutor.execute(); if (connectContext.getState().getStateType() != QueryState.MysqlStateType.ERR) { Planner planner = stmtExecutor.planner(); - return planner.getExplainString(planner.getFragments(), new ExplainOptions(isVerbose, false)); + return planner.getExplainString(new ExplainOptions(isVerbose, false)); } else { return connectContext.getState().getErrorMessage(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index e822667383..4e0a49016f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -282,7 +282,7 @@ public class UtFrameUtils { stmtExecutor.execute(); if (ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) { Planner planner = stmtExecutor.planner(); - return planner.getExplainString(planner.getFragments(), new ExplainOptions(isVerbose, false)); + return planner.getExplainString(new ExplainOptions(isVerbose, false)); } else { return ctx.getState().getErrorMessage(); }