[feature](nereids) Integrate nereids into current SQL process framework (#10304)
To integrate the nereids optimizer with new SQLParser and Planner to existing SQL process framework, I abstract a interface which named "Planner" and let the Both planner from nereids and stale optimizer implement it, to disguish it with origin Planner, I rename the Planner to OriginalPlanner. As we don't want to impact the existing logic too much, I defined a LogicalPlanAdapter to adapt the logicalPlan that is the output of the new paser to the existing code. Besides, as the MySQL protocol supports sending multiple statements in one packet, so I add Nereids#SparseSQL method to handle this properly.
This commit is contained in:
@ -35,10 +35,10 @@ import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.OlapTableSink;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -49,7 +49,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class UpdatePlanner extends Planner {
|
||||
public class UpdatePlanner extends OriginalPlanner {
|
||||
|
||||
private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
|
||||
private final IdGenerator<PlanFragmentId> fragmentIdGenerator =
|
||||
@ -65,6 +65,7 @@ public class UpdatePlanner extends Planner {
|
||||
|
||||
public UpdatePlanner(long dbId, OlapTable targetTable, List<Expr> setExprs,
|
||||
TupleDescriptor srcTupleDesc, Analyzer analyzer) {
|
||||
super(analyzer);
|
||||
this.targetDBId = dbId;
|
||||
this.targetTable = targetTable;
|
||||
this.setExprs = setExprs;
|
||||
|
||||
@ -17,27 +17,64 @@
|
||||
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
|
||||
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.memo.Memo;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.plans.PhysicalPlanTranslator;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanContext;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Planner to do query plan in Nereids.
|
||||
*/
|
||||
public class Planner {
|
||||
public class NereidsPlanner extends Planner {
|
||||
|
||||
private PlannerContext plannerContext;
|
||||
private final ConnectContext ctx;
|
||||
private List<ScanNode> scanNodeList = null;
|
||||
|
||||
public NereidsPlanner(ConnectContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void plan(StatementBase queryStmt,
|
||||
org.apache.doris.thrift.TQueryOptions queryOptions) throws UserException {
|
||||
if (!(queryStmt instanceof LogicalPlanAdapter)) {
|
||||
throw new RuntimeException("Wrong type of queryStmt, expected: <? extends LogicalPlanAdapter>");
|
||||
}
|
||||
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt;
|
||||
PhysicalPlan physicalPlan = plan(logicalPlanAdapter.getLogicalPlan(), new PhysicalProperties(), ctx);
|
||||
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator();
|
||||
PlanContext planContext = new PlanContext();
|
||||
physicalPlanTranslator.translatePlan(physicalPlan, planContext);
|
||||
fragments = new ArrayList<>(planContext.getPlanFragmentList());
|
||||
PlanFragment root = fragments.get(fragments.size() - 1);
|
||||
root.setOutputExprs(queryStmt.getResultExprs());
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
root.getPlanRoot().convertToVectoriezd();
|
||||
}
|
||||
scanNodeList = planContext.getScanNodeList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Do analyze and optimize for query plan.
|
||||
@ -67,6 +104,11 @@ public class Planner {
|
||||
return getRoot().extractPlan();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ScanNode> getScanNodes() {
|
||||
return scanNodeList;
|
||||
}
|
||||
|
||||
public Group getRoot() {
|
||||
return plannerContext.getOptimizerContext().getMemo().getRoot();
|
||||
}
|
||||
@ -93,4 +135,9 @@ public class Planner {
|
||||
|
||||
return physicalPlan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBlockQuery() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.nereids.DorisParser.JoinCriteriaContext;
|
||||
import org.apache.doris.nereids.DorisParser.JoinRelationContext;
|
||||
import org.apache.doris.nereids.DorisParser.LogicalBinaryContext;
|
||||
import org.apache.doris.nereids.DorisParser.LogicalNotContext;
|
||||
import org.apache.doris.nereids.DorisParser.MultiStatementsContext;
|
||||
import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
|
||||
import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
|
||||
import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
|
||||
@ -52,6 +53,7 @@ import org.apache.doris.nereids.DorisParser.SelectClauseContext;
|
||||
import org.apache.doris.nereids.DorisParser.SingleStatementContext;
|
||||
import org.apache.doris.nereids.DorisParser.SortItemContext;
|
||||
import org.apache.doris.nereids.DorisParser.StarContext;
|
||||
import org.apache.doris.nereids.DorisParser.StatementContext;
|
||||
import org.apache.doris.nereids.DorisParser.StringLiteralContext;
|
||||
import org.apache.doris.nereids.DorisParser.TableNameContext;
|
||||
import org.apache.doris.nereids.DorisParser.WhereClauseContext;
|
||||
@ -147,6 +149,18 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
return ParserUtils.withOrigin(ctx, f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Visit multi-statements.
|
||||
*/
|
||||
public Object visitMultiStatements(MultiStatementsContext ctx) {
|
||||
List<LogicalPlan> logicalPlanList = new ArrayList<>();
|
||||
for (StatementContext stmtCtx : ctx.statement()) {
|
||||
LogicalPlan logicalPlan = (LogicalPlan) visit(stmtCtx);
|
||||
logicalPlanList.add(logicalPlan);
|
||||
}
|
||||
return logicalPlanList;
|
||||
}
|
||||
|
||||
/* ********************************************************************************************
|
||||
* Plan parsing
|
||||
* ******************************************************************************************** */
|
||||
|
||||
@ -0,0 +1,108 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.parser;
|
||||
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.nereids.DorisLexer;
|
||||
import org.apache.doris.nereids.DorisParser;
|
||||
import org.apache.doris.nereids.exceptions.ParsingException;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter;
|
||||
|
||||
import org.antlr.v4.runtime.CharStreams;
|
||||
import org.antlr.v4.runtime.CommonTokenStream;
|
||||
import org.antlr.v4.runtime.ParserRuleContext;
|
||||
import org.antlr.v4.runtime.atn.PredictionMode;
|
||||
import org.antlr.v4.runtime.misc.ParseCancellationException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Sql parser, convert sql DSL to logical plan.
|
||||
*/
|
||||
public class NereidsParser {
|
||||
|
||||
/**
|
||||
* In MySQL protocol, client could send multi-statement in.
|
||||
* a single packet.
|
||||
* https://dev.mysql.com/doc/internals/en/com-set-option.html
|
||||
*/
|
||||
public List<StatementBase> parseSQL(String originStr) throws Exception {
|
||||
List<LogicalPlan> logicalPlanList = parseMultiple(originStr);
|
||||
List<StatementBase> statementBaseList = new ArrayList<>();
|
||||
for (LogicalPlan logicalPlan : logicalPlanList) {
|
||||
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalPlan);
|
||||
statementBaseList.add(logicalPlanAdapter);
|
||||
}
|
||||
return statementBaseList;
|
||||
}
|
||||
|
||||
/**
|
||||
* parse sql DSL string.
|
||||
*
|
||||
* @param sql sql string
|
||||
* @return logical plan
|
||||
*/
|
||||
public LogicalPlan parseSingle(String sql) throws Exception {
|
||||
return (LogicalPlan) parse(sql, DorisParser::singleStatement);
|
||||
}
|
||||
|
||||
public List<LogicalPlan> parseMultiple(String sql) throws Exception {
|
||||
return (List<LogicalPlan>) parse(sql, DorisParser::multiStatements);
|
||||
}
|
||||
|
||||
private Object parse(String sql, Function<DorisParser, ParserRuleContext> parseFunction) {
|
||||
try {
|
||||
ParserRuleContext tree = toAst(sql, parseFunction);
|
||||
LogicalPlanBuilder logicalPlanBuilder = new LogicalPlanBuilder();
|
||||
return logicalPlanBuilder.visit(tree);
|
||||
} catch (StackOverflowError e) {
|
||||
throw new ParsingException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private ParserRuleContext toAst(String sql, Function<DorisParser, ParserRuleContext> parseFunction) {
|
||||
DorisLexer lexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
|
||||
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
|
||||
DorisParser parser = new DorisParser(tokenStream);
|
||||
// parser.addParseListener(PostProcessor)
|
||||
// parser.removeErrorListeners()
|
||||
// parser.addErrorListener(ParseErrorListener)
|
||||
ParserRuleContext tree;
|
||||
try {
|
||||
// first, try parsing with potentially faster SLL mode
|
||||
parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
|
||||
tree = parseFunction.apply(parser);
|
||||
} catch (ParseCancellationException ex) {
|
||||
// if we fail, parse with LL mode
|
||||
tokenStream.seek(0); // rewind input stream
|
||||
parser.reset();
|
||||
|
||||
parser.getInterpreter().setPredictionMode(PredictionMode.LL);
|
||||
tree = parseFunction.apply(parser);
|
||||
}
|
||||
return tree;
|
||||
}
|
||||
|
||||
public Expression createExpression(String expression) {
|
||||
return (Expression) parse(expression, DorisParser::expression);
|
||||
}
|
||||
}
|
||||
@ -1,85 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.parser;
|
||||
|
||||
import org.apache.doris.nereids.DorisLexer;
|
||||
import org.apache.doris.nereids.DorisParser;
|
||||
import org.apache.doris.nereids.exceptions.ParsingException;
|
||||
import org.apache.doris.nereids.trees.TreeNode;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
|
||||
import org.antlr.v4.runtime.CharStreams;
|
||||
import org.antlr.v4.runtime.CommonTokenStream;
|
||||
import org.antlr.v4.runtime.ParserRuleContext;
|
||||
import org.antlr.v4.runtime.atn.PredictionMode;
|
||||
import org.antlr.v4.runtime.misc.ParseCancellationException;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Sql parser, convert sql DSL to logical plan.
|
||||
*/
|
||||
public class SqlParser {
|
||||
|
||||
/**
|
||||
* parse sql DSL string.
|
||||
*
|
||||
* @param sql sql string
|
||||
* @return logical plan
|
||||
*/
|
||||
public LogicalPlan parse(String sql) {
|
||||
return (LogicalPlan) parse(sql, DorisParser::singleStatement);
|
||||
}
|
||||
|
||||
private TreeNode parse(String sql, Function<DorisParser, ParserRuleContext> parseFunction) {
|
||||
try {
|
||||
DorisLexer lexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
|
||||
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
|
||||
DorisParser parser = new DorisParser(tokenStream);
|
||||
|
||||
// parser.addParseListener(PostProcessor)
|
||||
// parser.removeErrorListeners()
|
||||
// parser.addErrorListener(ParseErrorListener)
|
||||
|
||||
ParserRuleContext tree;
|
||||
try {
|
||||
// first, try parsing with potentially faster SLL mode
|
||||
parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
|
||||
tree = parseFunction.apply(parser);
|
||||
} catch (ParseCancellationException ex) {
|
||||
// if we fail, parse with LL mode
|
||||
tokenStream.seek(0); // rewind input stream
|
||||
parser.reset();
|
||||
|
||||
parser.getInterpreter().setPredictionMode(PredictionMode.LL);
|
||||
tree = parseFunction.apply(parser);
|
||||
}
|
||||
|
||||
LogicalPlanBuilder logicalPlanBuilder = new LogicalPlanBuilder();
|
||||
return (TreeNode) logicalPlanBuilder.visit(tree);
|
||||
|
||||
} catch (StackOverflowError e) {
|
||||
throw new ParsingException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Expression createExpression(String expression) {
|
||||
return (Expression) parse(expression, DorisParser::expression);
|
||||
}
|
||||
}
|
||||
@ -23,6 +23,8 @@ package org.apache.doris.nereids.properties;
|
||||
public class PhysicalProperties {
|
||||
private DistributionSpec distributionDesc;
|
||||
|
||||
public PhysicalProperties() {}
|
||||
|
||||
public DistributionSpec getDistributionDesc() {
|
||||
return distributionDesc;
|
||||
}
|
||||
|
||||
@ -1,59 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.qe;
|
||||
|
||||
import org.apache.doris.nereids.parser.SqlParser;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Temporary executor in Nereids.
|
||||
*/
|
||||
public class Executor {
|
||||
private static final Logger LOG = LogManager.getLogger(Executor.class);
|
||||
|
||||
private final String sql;
|
||||
private final ConnectContext context;
|
||||
|
||||
public Executor(String sql, ConnectContext context) {
|
||||
this.sql = sql;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public void dryRun() throws Exception {
|
||||
doExecute(false);
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
doExecute(true);
|
||||
}
|
||||
|
||||
private void doExecute(boolean sendFragments) throws Exception {
|
||||
LOG.info("==== input SQL: ====\n{}", sql);
|
||||
System.out.println("==== input SQL: ====\n" + sql + "\n");
|
||||
|
||||
// parse phase
|
||||
SqlParser parser = new SqlParser();
|
||||
LogicalPlan parsedPlan = parser.parse(sql);
|
||||
LOG.info("==== parsed plan: ====\n{}", parsedPlan.treeString());
|
||||
System.out.println("==== parsed plan: ====\n" + parsedPlan.treeString() + "\n");
|
||||
}
|
||||
}
|
||||
@ -77,9 +77,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate in following steps:
|
||||
* 1.
|
||||
*
|
||||
* Translate Agg.
|
||||
*/
|
||||
@Override
|
||||
public PlanFragment visitPhysicalAggregation(
|
||||
@ -141,6 +139,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
OlapTable olapTable = physicalOlapScan.getTable();
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
|
||||
OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
|
||||
context.addScanNode(olapScanNode);
|
||||
// Create PlanFragment
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
|
||||
context.addPlanFragment(planFragment);
|
||||
@ -194,6 +193,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
childSortNode.setLimit(limit + offset);
|
||||
}
|
||||
childSortNode.setOffset(0);
|
||||
context.addPlanFragment(mergeFragment);
|
||||
return mergeFragment;
|
||||
}
|
||||
|
||||
@ -228,6 +228,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
rightFragment.setDestination(exchangeNode);
|
||||
crossJoinNode.setChild(0, leftFragment.getPlanRoot());
|
||||
leftFragment.setPlanRoot(crossJoinNode);
|
||||
context.addPlanFragment(leftFragment);
|
||||
return leftFragment;
|
||||
}
|
||||
|
||||
@ -251,7 +252,9 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
hashJoinNode.setLimit(physicalHashJoin.getLimited());
|
||||
leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot());
|
||||
rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot);
|
||||
return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
|
||||
PlanFragment result = new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
|
||||
context.addPlanFragment(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -24,19 +24,22 @@ import org.apache.doris.common.IdGenerator;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Context of physical plan.
|
||||
*/
|
||||
public class PlanContext {
|
||||
private List<PlanFragment> planFragmentList = Lists.newArrayList();
|
||||
private final List<PlanFragment> planFragmentList = Lists.newArrayList();
|
||||
|
||||
private DescriptorTable descTable = new DescriptorTable();
|
||||
private final DescriptorTable descTable = new DescriptorTable();
|
||||
|
||||
private final List<ScanNode> scanNodeList = new ArrayList<>();
|
||||
|
||||
private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
|
||||
|
||||
@ -70,4 +73,11 @@ public class PlanContext {
|
||||
this.planFragmentList.add(planFragment);
|
||||
}
|
||||
|
||||
public void addScanNode(ScanNode scanNode) {
|
||||
scanNodeList.add(scanNode);
|
||||
}
|
||||
|
||||
public List<ScanNode> getScanNodeList() {
|
||||
return scanNodeList;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.logical;
|
||||
|
||||
import org.apache.doris.analysis.RedirectStatus;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
|
||||
/**
|
||||
* This class is used for the compatibility and code reuse in.
|
||||
* @see org.apache.doris.qe.ConnectProcessor
|
||||
*/
|
||||
public class LogicalPlanAdapter extends StatementBase {
|
||||
|
||||
private final LogicalPlan logicalPlan;
|
||||
|
||||
public LogicalPlanAdapter(LogicalPlan logicalPlan) {
|
||||
this.logicalPlan = logicalPlan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
|
||||
public LogicalPlan getLogicalPlan() {
|
||||
return logicalPlan;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,401 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Planner.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InsertStmt;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TRuntimeFilterMode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The planner is responsible for turning parse trees into plan fragments that can be shipped off to backends for
|
||||
* execution.
|
||||
*/
|
||||
public class OriginalPlanner extends Planner {
|
||||
private static final Logger LOG = LogManager.getLogger(OriginalPlanner.class);
|
||||
|
||||
private PlannerContext plannerContext;
|
||||
private SingleNodePlanner singleNodePlanner;
|
||||
private DistributedPlanner distributedPlanner;
|
||||
private Analyzer analyzer;
|
||||
|
||||
public OriginalPlanner(Analyzer analyzer) {
|
||||
this.analyzer = analyzer;
|
||||
}
|
||||
|
||||
public boolean isBlockQuery() {
|
||||
return isBlockQuery;
|
||||
}
|
||||
|
||||
public PlannerContext getPlannerContext() {
|
||||
return plannerContext;
|
||||
}
|
||||
|
||||
public List<ScanNode> getScanNodes() {
|
||||
if (singleNodePlanner == null) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return singleNodePlanner.getScanNodes();
|
||||
}
|
||||
|
||||
public void plan(StatementBase queryStmt, TQueryOptions queryOptions)
|
||||
throws UserException {
|
||||
createPlanFragments(queryStmt, analyzer, queryOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
private void setResultExprScale(Analyzer analyzer, ArrayList<Expr> outputExprs) {
|
||||
for (TupleDescriptor tupleDesc : analyzer.getDescTbl().getTupleDescs()) {
|
||||
for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
|
||||
for (Expr expr : outputExprs) {
|
||||
List<SlotId> slotList = Lists.newArrayList();
|
||||
expr.getIds(null, slotList);
|
||||
if (PrimitiveType.DECIMALV2 != expr.getType().getPrimitiveType()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (PrimitiveType.DECIMALV2 != slotDesc.getType().getPrimitiveType()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (slotList.contains(slotDesc.getId()) && null != slotDesc.getColumn()) {
|
||||
int outputScale = slotDesc.getColumn().getScale();
|
||||
if (outputScale >= 0) {
|
||||
if (outputScale > expr.getOutputScale()) {
|
||||
expr.setOutputScale(outputScale);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return combined explain string for all plan fragments.
|
||||
*/
|
||||
@Override
|
||||
public void appendTupleInfo(StringBuilder str) {
|
||||
str.append(plannerContext.getRootAnalyzer().getDescTbl().getExplainString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in
|
||||
* a list such that element i of that list can only consume output of the following fragments j > i.
|
||||
*/
|
||||
public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
|
||||
throws UserException {
|
||||
QueryStmt queryStmt;
|
||||
if (statement instanceof InsertStmt) {
|
||||
queryStmt = ((InsertStmt) statement).getQueryStmt();
|
||||
} else {
|
||||
queryStmt = (QueryStmt) statement;
|
||||
}
|
||||
|
||||
plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement);
|
||||
singleNodePlanner = new SingleNodePlanner(plannerContext);
|
||||
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
|
||||
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
singleNodePlan.convertToVectoriezd();
|
||||
}
|
||||
|
||||
if (analyzer.getContext() != null
|
||||
&& analyzer.getContext().getSessionVariable().isEnableProjection()
|
||||
&& statement instanceof SelectStmt) {
|
||||
ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
|
||||
projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
|
||||
}
|
||||
|
||||
if (statement instanceof InsertStmt) {
|
||||
InsertStmt insertStmt = (InsertStmt) statement;
|
||||
insertStmt.prepareExpressions();
|
||||
}
|
||||
|
||||
// TODO chenhao16 , no used materialization work
|
||||
// compute referenced slots before calling computeMemLayout()
|
||||
//analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);
|
||||
|
||||
setResultExprScale(analyzer, queryStmt.getResultExprs());
|
||||
|
||||
// materialized view selector
|
||||
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
|
||||
if (selectFailed) {
|
||||
throw new MVSelectFailedException("Failed to select materialize view");
|
||||
}
|
||||
|
||||
/**
|
||||
* - Under normal circumstances, computeMemLayout() will be executed
|
||||
* at the end of the init function of the plan node.
|
||||
* Such as :
|
||||
* OlapScanNode {
|
||||
* init () {
|
||||
* analyzer.materializeSlots(conjuncts);
|
||||
* computeTupleStatAndMemLayout(analyzer);
|
||||
* computeStat();
|
||||
* }
|
||||
* }
|
||||
* - However Doris is currently unable to determine
|
||||
* whether it is possible to cut or increase the columns in the tuple after PlanNode.init().
|
||||
* - Therefore, for the time being, computeMemLayout() can only be placed
|
||||
* after the completion of the entire single node planner.
|
||||
*/
|
||||
analyzer.getDescTbl().computeMemLayout();
|
||||
singleNodePlan.finalize(analyzer);
|
||||
|
||||
if (queryOptions.num_nodes == 1) {
|
||||
// single-node execution; we're almost done
|
||||
singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan);
|
||||
fragments.add(new PlanFragment(plannerContext.getNextFragmentId(), singleNodePlan,
|
||||
DataPartition.UNPARTITIONED));
|
||||
} else {
|
||||
// all select query are unpartitioned.
|
||||
distributedPlanner = new DistributedPlanner(plannerContext);
|
||||
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
|
||||
}
|
||||
|
||||
// Optimize the transfer of query statistic when query doesn't contain limit.
|
||||
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
|
||||
QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer
|
||||
= new QueryStatisticsTransferOptimizer(rootFragment);
|
||||
queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
|
||||
|
||||
// Create runtime filters.
|
||||
if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase()
|
||||
.equals(TRuntimeFilterMode.OFF.name())) {
|
||||
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
|
||||
}
|
||||
|
||||
if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) {
|
||||
InsertStmt insertStmt = (InsertStmt) statement;
|
||||
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
|
||||
rootFragment.setSink(insertStmt.getDataSink());
|
||||
insertStmt.complete();
|
||||
ArrayList<Expr> exprs = ((InsertStmt) statement).getResultExprs();
|
||||
List<Expr> resExprs = Expr.substituteList(
|
||||
exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
|
||||
rootFragment.setOutputExprs(resExprs);
|
||||
} else {
|
||||
List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(),
|
||||
rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);
|
||||
rootFragment.setOutputExprs(resExprs);
|
||||
}
|
||||
LOG.debug("finalize plan fragments");
|
||||
for (PlanFragment fragment : fragments) {
|
||||
fragment.finalize(queryStmt);
|
||||
}
|
||||
|
||||
Collections.reverse(fragments);
|
||||
|
||||
pushDownResultFileSink(analyzer);
|
||||
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
SelectStmt selectStmt = (SelectStmt) queryStmt;
|
||||
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
|
||||
isBlockQuery = true;
|
||||
LOG.debug("this is block query");
|
||||
} else {
|
||||
isBlockQuery = false;
|
||||
LOG.debug("this isn't block query");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise
|
||||
* returns root unchanged.
|
||||
*/
|
||||
private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
|
||||
throws UserException {
|
||||
Preconditions.checkNotNull(root);
|
||||
// List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds());
|
||||
|
||||
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
|
||||
if (conjuncts.isEmpty()) {
|
||||
return root;
|
||||
}
|
||||
// evaluate conjuncts in SelectNode
|
||||
SelectNode selectNode = new SelectNode(plannerContext.getNextNodeId(), root, conjuncts);
|
||||
selectNode.init(analyzer);
|
||||
Preconditions.checkState(selectNode.hasValidStats());
|
||||
return selectNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is mainly used to try to push the top-level result file sink down one layer.
|
||||
* The result file sink after the pushdown can realize the function of concurrently exporting the result set.
|
||||
* Push down needs to meet the following conditions:
|
||||
* 1. The query enables the session variable of the concurrent export result set
|
||||
* 2. The top-level fragment is not a merge change node
|
||||
* 3. The export method uses the s3 method
|
||||
*
|
||||
* After satisfying the above three conditions,
|
||||
* the result file sink and the associated output expr will be pushed down to the next layer.
|
||||
* The second plan fragment performs expression calculation and derives the result set.
|
||||
* The top plan fragment will only summarize the status of the exported result set and return it to fe.
|
||||
*/
|
||||
private void pushDownResultFileSink(Analyzer analyzer) {
|
||||
if (fragments.size() < 1) {
|
||||
return;
|
||||
}
|
||||
if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
|
||||
return;
|
||||
}
|
||||
if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
|
||||
return;
|
||||
}
|
||||
if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
|
||||
return;
|
||||
}
|
||||
PlanFragment topPlanFragment = fragments.get(0);
|
||||
ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot();
|
||||
// try to push down result file sink
|
||||
if (topPlanNode.isMergingExchange()) {
|
||||
return;
|
||||
}
|
||||
PlanFragment secondPlanFragment = fragments.get(1);
|
||||
ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink();
|
||||
if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) {
|
||||
return;
|
||||
}
|
||||
if (secondPlanFragment.getOutputExprs() != null) {
|
||||
return;
|
||||
}
|
||||
// create result file sink desc
|
||||
TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
|
||||
resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
|
||||
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
|
||||
secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
|
||||
secondPlanFragment.resetSink(resultFileSink);
|
||||
ResultSink resultSink = new ResultSink(topPlanNode.getId());
|
||||
topPlanFragment.resetSink(resultSink);
|
||||
topPlanFragment.resetOutputExprs(fileStatusDesc);
|
||||
topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a tuple for file status, the tuple schema as following:
|
||||
* | FileNumber | Int |
|
||||
* | TotalRows | Bigint |
|
||||
* | FileSize | Bigint |
|
||||
* | URL | Varchar |
|
||||
*/
|
||||
private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {
|
||||
TupleDescriptor resultFileStatusTupleDesc =
|
||||
analyzer.getDescTbl().createTupleDescriptor("result_file_status");
|
||||
resultFileStatusTupleDesc.setIsMaterialized(true);
|
||||
SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
fileNumber.setLabel("FileNumber");
|
||||
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
|
||||
fileNumber.setIsMaterialized(true);
|
||||
fileNumber.setIsNullable(false);
|
||||
SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
totalRows.setLabel("TotalRows");
|
||||
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
|
||||
totalRows.setIsMaterialized(true);
|
||||
totalRows.setIsNullable(false);
|
||||
SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
fileSize.setLabel("FileSize");
|
||||
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
|
||||
fileSize.setIsMaterialized(true);
|
||||
fileSize.setIsNullable(false);
|
||||
SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
url.setLabel("URL");
|
||||
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
url.setIsMaterialized(true);
|
||||
url.setIsNullable(false);
|
||||
resultFileStatusTupleDesc.computeStatAndMemLayout();
|
||||
return resultFileStatusTupleDesc;
|
||||
}
|
||||
|
||||
private static class QueryStatisticsTransferOptimizer {
|
||||
private final PlanFragment root;
|
||||
|
||||
public QueryStatisticsTransferOptimizer(PlanFragment root) {
|
||||
Preconditions.checkNotNull(root);
|
||||
this.root = root;
|
||||
}
|
||||
|
||||
public void optimizeQueryStatisticsTransfer() {
|
||||
optimizeQueryStatisticsTransfer(root, null);
|
||||
}
|
||||
|
||||
private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) {
|
||||
if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) {
|
||||
fragment.setTransferQueryStatisticsWithEveryBatch(true);
|
||||
}
|
||||
for (PlanFragment child : fragment.getChildren()) {
|
||||
optimizeQueryStatisticsTransfer(child, fragment);
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether leaf node contains limit.
|
||||
private boolean hasLimit(PlanNode ancestor, PlanNode successor) {
|
||||
final List<PlanNode> exchangeNodes = Lists.newArrayList();
|
||||
collectExchangeNode(ancestor, exchangeNodes);
|
||||
for (PlanNode leaf : exchangeNodes) {
|
||||
if (leaf.getChild(0) == successor
|
||||
&& leaf.hasLimit()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void collectExchangeNode(PlanNode planNode, List<PlanNode> exchangeNodes) {
|
||||
if (planNode instanceof ExchangeNode) {
|
||||
exchangeNodes.add(planNode);
|
||||
}
|
||||
|
||||
for (PlanNode child : planNode.getChildren()) {
|
||||
if (child instanceof ExchangeNode) {
|
||||
exchangeNodes.add(child);
|
||||
} else {
|
||||
collectExchangeNode(child, exchangeNodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6,7 +6,7 @@
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
@ -14,34 +14,15 @@
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Planner.java
|
||||
// and modified by Doris
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.ExplainOptions;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InsertStmt;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.profile.PlanTreeBuilder;
|
||||
import org.apache.doris.common.profile.PlanTreePrinter;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TRuntimeFilterMode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -49,81 +30,22 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The planner is responsible for turning parse trees into plan fragments that can be shipped off to backends for
|
||||
* execution.
|
||||
*/
|
||||
public class Planner {
|
||||
private static final Logger LOG = LogManager.getLogger(Planner.class);
|
||||
public abstract class Planner {
|
||||
|
||||
private boolean isBlockQuery = false;
|
||||
private static final Logger LOG = LogManager.getLogger(Planner.class);
|
||||
|
||||
protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
|
||||
|
||||
private PlannerContext plannerContext;
|
||||
private SingleNodePlanner singleNodePlanner;
|
||||
private DistributedPlanner distributedPlanner;
|
||||
protected boolean isBlockQuery = false;
|
||||
|
||||
public boolean isBlockQuery() {
|
||||
return isBlockQuery;
|
||||
}
|
||||
public abstract List<ScanNode> getScanNodes();
|
||||
|
||||
public List<PlanFragment> getFragments() {
|
||||
return fragments;
|
||||
}
|
||||
public abstract void plan(StatementBase queryStmt,
|
||||
TQueryOptions queryOptions) throws UserException;
|
||||
|
||||
public PlannerContext getPlannerContext() {
|
||||
return plannerContext;
|
||||
}
|
||||
|
||||
public List<ScanNode> getScanNodes() {
|
||||
if (singleNodePlanner == null) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return singleNodePlanner.getScanNodes();
|
||||
}
|
||||
|
||||
public void plan(StatementBase queryStmt, Analyzer analyzer, TQueryOptions queryOptions)
|
||||
throws UserException {
|
||||
createPlanFragments(queryStmt, analyzer, queryOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
private void setResultExprScale(Analyzer analyzer, ArrayList<Expr> outputExprs) {
|
||||
for (TupleDescriptor tupleDesc : analyzer.getDescTbl().getTupleDescs()) {
|
||||
for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
|
||||
for (Expr expr : outputExprs) {
|
||||
List<SlotId> slotList = Lists.newArrayList();
|
||||
expr.getIds(null, slotList);
|
||||
if (PrimitiveType.DECIMALV2 != expr.getType().getPrimitiveType()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (PrimitiveType.DECIMALV2 != slotDesc.getType().getPrimitiveType()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (slotList.contains(slotDesc.getId()) && null != slotDesc.getColumn()) {
|
||||
int outputScale = slotDesc.getColumn().getScale();
|
||||
if (outputScale >= 0) {
|
||||
if (outputScale > expr.getOutputScale()) {
|
||||
expr.setOutputScale(outputScale);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return combined explain string for all plan fragments.
|
||||
*/
|
||||
public String getExplainString(List<PlanFragment> fragments, ExplainOptions explainOptions) {
|
||||
public String getExplainString(ExplainOptions explainOptions) {
|
||||
Preconditions.checkNotNull(explainOptions);
|
||||
if (explainOptions.isGraph()) {
|
||||
// print the plan graph
|
||||
@ -138,7 +60,9 @@ public class Planner {
|
||||
}
|
||||
|
||||
// print text plan
|
||||
TExplainLevel explainLevel = explainOptions.isVerbose() ? TExplainLevel.VERBOSE : TExplainLevel.NORMAL;
|
||||
org.apache.doris.thrift.TExplainLevel
|
||||
explainLevel = explainOptions.isVerbose()
|
||||
? org.apache.doris.thrift.TExplainLevel.VERBOSE : org.apache.doris.thrift.TExplainLevel.NORMAL;
|
||||
StringBuilder str = new StringBuilder();
|
||||
for (int i = 0; i < fragments.size(); ++i) {
|
||||
PlanFragment fragment = fragments.get(i);
|
||||
@ -149,287 +73,20 @@ public class Planner {
|
||||
str.append("PLAN FRAGMENT " + i + "\n");
|
||||
str.append(fragment.getExplainString(explainLevel));
|
||||
}
|
||||
if (explainLevel == TExplainLevel.VERBOSE) {
|
||||
str.append(plannerContext.getRootAnalyzer().getDescTbl().getExplainString());
|
||||
if (explainLevel == org.apache.doris.thrift.TExplainLevel.VERBOSE) {
|
||||
appendTupleInfo(str);
|
||||
}
|
||||
return str.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in
|
||||
* a list such that element i of that list can only consume output of the following fragments j > i.
|
||||
*/
|
||||
public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
|
||||
throws UserException {
|
||||
QueryStmt queryStmt;
|
||||
if (statement instanceof InsertStmt) {
|
||||
queryStmt = ((InsertStmt) statement).getQueryStmt();
|
||||
} else {
|
||||
queryStmt = (QueryStmt) statement;
|
||||
}
|
||||
public void appendTupleInfo(StringBuilder stringBuilder) {}
|
||||
|
||||
plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement);
|
||||
singleNodePlanner = new SingleNodePlanner(plannerContext);
|
||||
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
|
||||
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
singleNodePlan.convertToVectoriezd();
|
||||
}
|
||||
|
||||
if (analyzer.getContext() != null
|
||||
&& analyzer.getContext().getSessionVariable().isEnableProjection()
|
||||
&& statement instanceof SelectStmt) {
|
||||
ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
|
||||
projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
|
||||
}
|
||||
|
||||
if (statement instanceof InsertStmt) {
|
||||
InsertStmt insertStmt = (InsertStmt) statement;
|
||||
insertStmt.prepareExpressions();
|
||||
}
|
||||
|
||||
// TODO chenhao16 , no used materialization work
|
||||
// compute referenced slots before calling computeMemLayout()
|
||||
//analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);
|
||||
|
||||
setResultExprScale(analyzer, queryStmt.getResultExprs());
|
||||
|
||||
// materialized view selector
|
||||
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
|
||||
if (selectFailed) {
|
||||
throw new MVSelectFailedException("Failed to select materialize view");
|
||||
}
|
||||
|
||||
/**
|
||||
* - Under normal circumstances, computeMemLayout() will be executed
|
||||
* at the end of the init function of the plan node.
|
||||
* Such as :
|
||||
* OlapScanNode {
|
||||
* init () {
|
||||
* analyzer.materializeSlots(conjuncts);
|
||||
* computeTupleStatAndMemLayout(analyzer);
|
||||
* computeStat();
|
||||
* }
|
||||
* }
|
||||
* - However Doris is currently unable to determine
|
||||
* whether it is possible to cut or increase the columns in the tuple after PlanNode.init().
|
||||
* - Therefore, for the time being, computeMemLayout() can only be placed
|
||||
* after the completion of the entire single node planner.
|
||||
*/
|
||||
analyzer.getDescTbl().computeMemLayout();
|
||||
singleNodePlan.finalize(analyzer);
|
||||
|
||||
if (queryOptions.num_nodes == 1) {
|
||||
// single-node execution; we're almost done
|
||||
singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan);
|
||||
fragments.add(new PlanFragment(plannerContext.getNextFragmentId(), singleNodePlan,
|
||||
DataPartition.UNPARTITIONED));
|
||||
} else {
|
||||
// all select query are unpartitioned.
|
||||
distributedPlanner = new DistributedPlanner(plannerContext);
|
||||
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
|
||||
}
|
||||
|
||||
// Optimize the transfer of query statistic when query doesn't contain limit.
|
||||
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
|
||||
QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer
|
||||
= new QueryStatisticsTransferOptimizer(rootFragment);
|
||||
queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
|
||||
|
||||
// Create runtime filters.
|
||||
if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase()
|
||||
.equals(TRuntimeFilterMode.OFF.name())) {
|
||||
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
|
||||
}
|
||||
|
||||
if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) {
|
||||
InsertStmt insertStmt = (InsertStmt) statement;
|
||||
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
|
||||
rootFragment.setSink(insertStmt.getDataSink());
|
||||
insertStmt.complete();
|
||||
ArrayList<Expr> exprs = ((InsertStmt) statement).getResultExprs();
|
||||
List<Expr> resExprs = Expr.substituteList(
|
||||
exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
|
||||
rootFragment.setOutputExprs(resExprs);
|
||||
} else {
|
||||
List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(),
|
||||
rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);
|
||||
rootFragment.setOutputExprs(resExprs);
|
||||
}
|
||||
LOG.debug("finalize plan fragments");
|
||||
for (PlanFragment fragment : fragments) {
|
||||
fragment.finalize(queryStmt);
|
||||
}
|
||||
|
||||
Collections.reverse(fragments);
|
||||
|
||||
pushDownResultFileSink(analyzer);
|
||||
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
SelectStmt selectStmt = (SelectStmt) queryStmt;
|
||||
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
|
||||
isBlockQuery = true;
|
||||
LOG.debug("this is block query");
|
||||
} else {
|
||||
isBlockQuery = false;
|
||||
LOG.debug("this isn't block query");
|
||||
}
|
||||
}
|
||||
public List<PlanFragment> getFragments() {
|
||||
return fragments;
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise
|
||||
* returns root unchanged.
|
||||
*/
|
||||
private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
|
||||
throws UserException {
|
||||
Preconditions.checkNotNull(root);
|
||||
// List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds());
|
||||
|
||||
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
|
||||
if (conjuncts.isEmpty()) {
|
||||
return root;
|
||||
}
|
||||
// evaluate conjuncts in SelectNode
|
||||
SelectNode selectNode = new SelectNode(plannerContext.getNextNodeId(), root, conjuncts);
|
||||
selectNode.init(analyzer);
|
||||
Preconditions.checkState(selectNode.hasValidStats());
|
||||
return selectNode;
|
||||
public boolean isBlockQuery() {
|
||||
return isBlockQuery;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is mainly used to try to push the top-level result file sink down one layer.
|
||||
* The result file sink after the pushdown can realize the function of concurrently exporting the result set.
|
||||
* Push down needs to meet the following conditions:
|
||||
* 1. The query enables the session variable of the concurrent export result set
|
||||
* 2. The top-level fragment is not a merge change node
|
||||
* 3. The export method uses the s3 method
|
||||
*
|
||||
* After satisfying the above three conditions,
|
||||
* the result file sink and the associated output expr will be pushed down to the next layer.
|
||||
* The second plan fragment performs expression calculation and derives the result set.
|
||||
* The top plan fragment will only summarize the status of the exported result set and return it to fe.
|
||||
*/
|
||||
private void pushDownResultFileSink(Analyzer analyzer) {
|
||||
if (fragments.size() < 1) {
|
||||
return;
|
||||
}
|
||||
if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
|
||||
return;
|
||||
}
|
||||
if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
|
||||
return;
|
||||
}
|
||||
if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
|
||||
return;
|
||||
}
|
||||
PlanFragment topPlanFragment = fragments.get(0);
|
||||
ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot();
|
||||
// try to push down result file sink
|
||||
if (topPlanNode.isMergingExchange()) {
|
||||
return;
|
||||
}
|
||||
PlanFragment secondPlanFragment = fragments.get(1);
|
||||
ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink();
|
||||
if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) {
|
||||
return;
|
||||
}
|
||||
if (secondPlanFragment.getOutputExprs() != null) {
|
||||
return;
|
||||
}
|
||||
// create result file sink desc
|
||||
TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
|
||||
resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
|
||||
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
|
||||
secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
|
||||
secondPlanFragment.resetSink(resultFileSink);
|
||||
ResultSink resultSink = new ResultSink(topPlanNode.getId());
|
||||
topPlanFragment.resetSink(resultSink);
|
||||
topPlanFragment.resetOutputExprs(fileStatusDesc);
|
||||
topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a tuple for file status, the tuple schema as following:
|
||||
* | FileNumber | Int |
|
||||
* | TotalRows | Bigint |
|
||||
* | FileSize | Bigint |
|
||||
* | URL | Varchar |
|
||||
*/
|
||||
private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {
|
||||
TupleDescriptor resultFileStatusTupleDesc =
|
||||
analyzer.getDescTbl().createTupleDescriptor("result_file_status");
|
||||
resultFileStatusTupleDesc.setIsMaterialized(true);
|
||||
SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
fileNumber.setLabel("FileNumber");
|
||||
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
|
||||
fileNumber.setIsMaterialized(true);
|
||||
fileNumber.setIsNullable(false);
|
||||
SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
totalRows.setLabel("TotalRows");
|
||||
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
|
||||
totalRows.setIsMaterialized(true);
|
||||
totalRows.setIsNullable(false);
|
||||
SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
fileSize.setLabel("FileSize");
|
||||
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
|
||||
fileSize.setIsMaterialized(true);
|
||||
fileSize.setIsNullable(false);
|
||||
SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
|
||||
url.setLabel("URL");
|
||||
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
url.setIsMaterialized(true);
|
||||
url.setIsNullable(false);
|
||||
resultFileStatusTupleDesc.computeStatAndMemLayout();
|
||||
return resultFileStatusTupleDesc;
|
||||
}
|
||||
|
||||
private static class QueryStatisticsTransferOptimizer {
|
||||
private final PlanFragment root;
|
||||
|
||||
public QueryStatisticsTransferOptimizer(PlanFragment root) {
|
||||
Preconditions.checkNotNull(root);
|
||||
this.root = root;
|
||||
}
|
||||
|
||||
public void optimizeQueryStatisticsTransfer() {
|
||||
optimizeQueryStatisticsTransfer(root, null);
|
||||
}
|
||||
|
||||
private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) {
|
||||
if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) {
|
||||
fragment.setTransferQueryStatisticsWithEveryBatch(true);
|
||||
}
|
||||
for (PlanFragment child : fragment.getChildren()) {
|
||||
optimizeQueryStatisticsTransfer(child, fragment);
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether leaf node contains limit.
|
||||
private boolean hasLimit(PlanNode ancestor, PlanNode successor) {
|
||||
final List<PlanNode> exchangeNodes = Lists.newArrayList();
|
||||
collectExchangeNode(ancestor, exchangeNodes);
|
||||
for (PlanNode leaf : exchangeNodes) {
|
||||
if (leaf.getChild(0) == successor
|
||||
&& leaf.hasLimit()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void collectExchangeNode(PlanNode planNode, List<PlanNode> exchangeNodes) {
|
||||
if (planNode instanceof ExchangeNode) {
|
||||
exchangeNodes.add(planNode);
|
||||
}
|
||||
|
||||
for (PlanNode child : planNode.getChildren()) {
|
||||
if (child instanceof ExchangeNode) {
|
||||
exchangeNodes.add(child);
|
||||
} else {
|
||||
collectExchangeNode(child, exchangeNodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,6 +45,7 @@ import org.apache.doris.mysql.MysqlPacket;
|
||||
import org.apache.doris.mysql.MysqlProto;
|
||||
import org.apache.doris.mysql.MysqlSerializer;
|
||||
import org.apache.doris.mysql.MysqlServerStatusFlag;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.plugin.AuditEvent.EventType;
|
||||
import org.apache.doris.proto.Data;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
@ -201,7 +202,18 @@ public class ConnectProcessor {
|
||||
List<Pair<StatementBase, Data.PQueryStatistics>> auditInfoList = Lists.newArrayList();
|
||||
boolean alreadyAddedToAuditInfoList = false;
|
||||
try {
|
||||
List<StatementBase> stmts = analyze(originStmt);
|
||||
List<StatementBase> stmts = null;
|
||||
if (ctx.getSessionVariable().isEnableNereids()) {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
try {
|
||||
stmts = nereidsParser.parseSQL(originStmt);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("SQL : {}, parse failed by new parser", originStmt, e);
|
||||
}
|
||||
}
|
||||
if (stmts == null) {
|
||||
stmts = analyze(originStmt);
|
||||
}
|
||||
for (int i = 0; i < stmts.size(); ++i) {
|
||||
alreadyAddedToAuditInfoList = false;
|
||||
ctx.getState().reset();
|
||||
|
||||
@ -192,6 +192,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
static final String ENABLE_ARRAY_TYPE = "enable_array_type";
|
||||
|
||||
public static final String ENABLE_NEREIDS = "enable_nereids";
|
||||
|
||||
// session origin value
|
||||
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
|
||||
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
|
||||
@ -471,6 +473,15 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_ARRAY_TYPE)
|
||||
private boolean enableArrayType = false;
|
||||
|
||||
/**
|
||||
* as the new optimizer is not mature yet, use this var
|
||||
* to control whether to use new optimizer, remove it when
|
||||
* the new optimizer is fully developed. I hope that day
|
||||
* would be coming soon.
|
||||
*/
|
||||
@VariableMgr.VarAttr(name = ENABLE_NEREIDS)
|
||||
private boolean enableNereids = false;
|
||||
|
||||
public String getBlockEncryptionMode() {
|
||||
return blockEncryptionMode;
|
||||
}
|
||||
@ -970,6 +981,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.enableArrayType = enableArrayType;
|
||||
}
|
||||
|
||||
public boolean isEnableNereids() {
|
||||
return enableNereids;
|
||||
}
|
||||
|
||||
public void setEnableNereids(boolean enableNereids) {
|
||||
this.enableNereids = enableNereids;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize to thrift object.
|
||||
* Used for rest api.
|
||||
|
||||
@ -84,7 +84,10 @@ import org.apache.doris.mysql.MysqlChannel;
|
||||
import org.apache.doris.mysql.MysqlEofPacket;
|
||||
import org.apache.doris.mysql.MysqlSerializer;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlanAdapter;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.proto.Data;
|
||||
@ -729,10 +732,14 @@ public class StmtExecutor implements ProfileWriter {
|
||||
}
|
||||
plannerProfile.setQueryAnalysisFinishTime();
|
||||
|
||||
// create plan
|
||||
planner = new Planner();
|
||||
if (parsedStmt instanceof LogicalPlanAdapter) {
|
||||
// create plan
|
||||
planner = new NereidsPlanner(context);
|
||||
} else {
|
||||
planner = new OriginalPlanner(analyzer);
|
||||
}
|
||||
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {
|
||||
planner.plan(parsedStmt, analyzer, tQueryOptions);
|
||||
planner.plan(parsedStmt, tQueryOptions);
|
||||
}
|
||||
// TODO(zc):
|
||||
// Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
|
||||
@ -874,8 +881,12 @@ public class StmtExecutor implements ProfileWriter {
|
||||
newSelectStmt.reset();
|
||||
analyzer = new Analyzer(context.getCatalog(), context);
|
||||
newSelectStmt.analyze(analyzer);
|
||||
planner = new Planner();
|
||||
planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift());
|
||||
if (parsedStmt instanceof LogicalPlanAdapter) {
|
||||
planner = new NereidsPlanner(context);
|
||||
} else {
|
||||
planner = new OriginalPlanner(analyzer);
|
||||
}
|
||||
planner.plan(newSelectStmt, context.getSessionVariable().toThrift());
|
||||
}
|
||||
}
|
||||
sendResult(false, isSendFields, newSelectStmt, channel, cacheAnalyzer, cacheResult);
|
||||
@ -929,7 +940,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
}
|
||||
|
||||
if (queryStmt.isExplain()) {
|
||||
String explainString = planner.getExplainString(planner.getFragments(), queryStmt.getExplainOptions());
|
||||
String explainString = planner.getExplainString(queryStmt.getExplainOptions());
|
||||
handleExplainStmt(explainString);
|
||||
return;
|
||||
}
|
||||
@ -1245,7 +1256,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
if (insertStmt.getQueryStmt().isExplain()) {
|
||||
ExplainOptions explainOptions = insertStmt.getQueryStmt().getExplainOptions();
|
||||
insertStmt.setIsExplain(explainOptions);
|
||||
String explainString = planner.getExplainString(planner.getFragments(), explainOptions);
|
||||
String explainString = planner.getExplainString(explainOptions);
|
||||
handleExplainStmt(explainString);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ package org.apache.doris.analysis;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.utframe.DorisAssert;
|
||||
@ -562,28 +562,28 @@ public class SelectStmtTest {
|
||||
@Test
|
||||
public void testSelectHintSetVar() throws Exception {
|
||||
String sql = "SELECT sleep(3);";
|
||||
Planner planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
OriginalPlanner planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
Assert.assertEquals(VariableMgr.getDefaultSessionVariable().getQueryTimeoutS(),
|
||||
planner.getPlannerContext().getQueryOptions().query_timeout);
|
||||
|
||||
sql = "SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3);";
|
||||
planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
Assert.assertEquals(1, planner.getPlannerContext().getQueryOptions().query_timeout);
|
||||
|
||||
sql = "select * from db1.partition_table where datekey=20200726";
|
||||
planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
Assert.assertEquals(VariableMgr.getDefaultSessionVariable().getMaxExecMemByte(),
|
||||
planner.getPlannerContext().getQueryOptions().mem_limit);
|
||||
|
||||
sql = "select /*+ SET_VAR(exec_mem_limit = 8589934592) */ poi_id, count(*) from db1.partition_table "
|
||||
+ "where datekey=20200726 group by 1";
|
||||
planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
Assert.assertEquals(8589934592L, planner.getPlannerContext().getQueryOptions().mem_limit);
|
||||
|
||||
int queryTimeOut = dorisAssert.getSessionVariable().getQueryTimeoutS();
|
||||
long execMemLimit = dorisAssert.getSessionVariable().getMaxExecMemByte();
|
||||
sql = "select /*+ SET_VAR(exec_mem_limit = 8589934592, query_timeout = 1) */ 1 + 2;";
|
||||
planner = dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
planner = (OriginalPlanner) dorisAssert.query(sql).internalExecuteOneAndGetPlan();
|
||||
// session variable have been changed
|
||||
Assert.assertEquals(1, planner.getPlannerContext().getQueryOptions().query_timeout);
|
||||
Assert.assertEquals(8589934592L, planner.getPlannerContext().getQueryOptions().mem_limit);
|
||||
|
||||
@ -20,7 +20,7 @@ package org.apache.doris.nereids;
|
||||
import org.apache.doris.nereids.analyzer.Unbound;
|
||||
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
|
||||
import org.apache.doris.nereids.memo.Memo;
|
||||
import org.apache.doris.nereids.parser.SqlParser;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation;
|
||||
import org.apache.doris.nereids.rules.analysis.BindSlotReference;
|
||||
@ -37,7 +37,7 @@ import java.util.List;
|
||||
|
||||
public class AnalyzeTest extends TestWithFeService {
|
||||
|
||||
private final SqlParser parser = new SqlParser();
|
||||
private final NereidsParser parser = new NereidsParser();
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
@ -67,8 +67,8 @@ public class AnalyzeTest extends TestWithFeService {
|
||||
Assertions.assertTrue(checkBound(analyzed));
|
||||
}
|
||||
|
||||
private LogicalPlan analyze(String sql) {
|
||||
LogicalPlan parsed = parser.parse(sql);
|
||||
private LogicalPlan analyze(String sql) throws Exception {
|
||||
LogicalPlan parsed = parser.parseSingle(sql);
|
||||
return analyze(parsed, connectContext);
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,51 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.parser;
|
||||
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class NereidsParserTest {
|
||||
|
||||
@Test
|
||||
public void testParseMultiple() throws Exception {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
String sql = "SELECT b FROM test;SELECT a FROM test;";
|
||||
List<LogicalPlan> logicalPlanList = nereidsParser.parseMultiple(sql);
|
||||
Assertions.assertEquals(2, logicalPlanList.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingle() throws Exception {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
String sql = "SELECT * FROM test;";
|
||||
Exception exceptionOccurred = null;
|
||||
try {
|
||||
nereidsParser.parseSingle(sql);
|
||||
} catch (Exception e) {
|
||||
exceptionOccurred = e;
|
||||
e.printStackTrace();
|
||||
}
|
||||
Assert.assertNull(exceptionOccurred);
|
||||
}
|
||||
}
|
||||
@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.expression.rewrite;
|
||||
|
||||
import org.apache.doris.nereids.parser.SqlParser;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.rules.expression.rewrite.rules.NormalizeExpressionRule;
|
||||
import org.apache.doris.nereids.rules.expression.rewrite.rules.SimplifyNotExprRule;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
@ -29,7 +29,7 @@ import org.junit.Test;
|
||||
* all expr rewrite rule test case.
|
||||
*/
|
||||
public class ExpressionRewriteTest {
|
||||
private final SqlParser parser = new SqlParser();
|
||||
private static final NereidsParser PARSER = new NereidsParser();
|
||||
private ExpressionRuleExecutor executor;
|
||||
|
||||
@Test
|
||||
@ -57,8 +57,8 @@ public class ExpressionRewriteTest {
|
||||
}
|
||||
|
||||
private void assertRewrite(String expression, String expected) {
|
||||
Expression needRewriteExpression = parser.createExpression(expression);
|
||||
Expression expectedExpression = parser.createExpression(expected);
|
||||
Expression needRewriteExpression = PARSER.createExpression(expression);
|
||||
Expression expectedExpression = PARSER.createExpression(expected);
|
||||
Expression rewrittenExpression = executor.rewrite(needRewriteExpression);
|
||||
Assert.assertEquals(expectedExpression, rewrittenExpression);
|
||||
}
|
||||
|
||||
@ -17,16 +17,16 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.expressions;
|
||||
|
||||
import org.apache.doris.nereids.parser.SqlParser;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.trees.TreeNode;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class ExpressionParserTest {
|
||||
private static final SqlParser PARSER = new SqlParser();
|
||||
private static final NereidsParser PARSER = new NereidsParser();
|
||||
|
||||
private void assertSql(String sql) throws Exception {
|
||||
TreeNode treeNode = PARSER.parse(sql);
|
||||
TreeNode treeNode = PARSER.parseSingle(sql);
|
||||
System.out.println(treeNode.toString());
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ public class ExpressionParserTest {
|
||||
@Test
|
||||
public void testSqlAnd() throws Exception {
|
||||
String sql = "select * from test1 where a > 1 and b > 1";
|
||||
TreeNode treeNode = PARSER.parse(sql);
|
||||
TreeNode treeNode = PARSER.parseSingle(sql);
|
||||
System.out.println(treeNode);
|
||||
}
|
||||
|
||||
|
||||
@ -133,16 +133,14 @@ public class DistributedPlannerTest {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)"));
|
||||
|
||||
sql = "explain select * from db1.tbl1 join [SHUFFLE] db1.tbl2 on tbl1.k1 = tbl2.k3";
|
||||
stmtExecutor = new StmtExecutor(ctx, sql);
|
||||
stmtExecutor.execute();
|
||||
planner = stmtExecutor.planner();
|
||||
fragments = planner.getFragments();
|
||||
plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)"));
|
||||
}
|
||||
|
||||
@ -152,8 +150,7 @@ public class DistributedPlannerTest {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)"));
|
||||
|
||||
double originThreshold = ctx.getSessionVariable().autoBroadcastJoinThreshold;
|
||||
@ -162,8 +159,7 @@ public class DistributedPlannerTest {
|
||||
stmtExecutor = new StmtExecutor(ctx, sql);
|
||||
stmtExecutor.execute();
|
||||
planner = stmtExecutor.planner();
|
||||
fragments = planner.getFragments();
|
||||
plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)"));
|
||||
} finally {
|
||||
ctx.getSessionVariable().autoBroadcastJoinThreshold = originThreshold;
|
||||
|
||||
@ -93,8 +93,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
|
||||
stmtExecutor1.execute();
|
||||
Planner planner1 = stmtExecutor1.planner();
|
||||
List<PlanFragment> fragments1 = planner1.getFragments();
|
||||
String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(false, false));
|
||||
String plan1 = planner1.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan1, "UNION"));
|
||||
String sql2 = "explain select * from db1.tbl1 where k1='a' and k4=1\n"
|
||||
+ "union distinct\n"
|
||||
@ -118,8 +117,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2);
|
||||
stmtExecutor2.execute();
|
||||
Planner planner2 = stmtExecutor2.planner();
|
||||
List<PlanFragment> fragments2 = planner2.getFragments();
|
||||
String plan2 = planner2.getExplainString(fragments2, new ExplainOptions(false, false));
|
||||
String plan2 = planner2.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(4, StringUtils.countMatches(plan2, "UNION"));
|
||||
|
||||
// intersect
|
||||
@ -134,8 +132,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor3 = new StmtExecutor(connectContext, sql3);
|
||||
stmtExecutor3.execute();
|
||||
Planner planner3 = stmtExecutor3.planner();
|
||||
List<PlanFragment> fragments3 = planner3.getFragments();
|
||||
String plan3 = planner3.getExplainString(fragments3, new ExplainOptions(false, false));
|
||||
String plan3 = planner3.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan3, "INTERSECT"));
|
||||
String sql4 = "explain select * from db1.tbl1 where k1='a' and k4=1\n"
|
||||
+ "intersect distinct\n"
|
||||
@ -160,8 +157,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor4 = new StmtExecutor(connectContext, sql4);
|
||||
stmtExecutor4.execute();
|
||||
Planner planner4 = stmtExecutor4.planner();
|
||||
List<PlanFragment> fragments4 = planner4.getFragments();
|
||||
String plan4 = planner4.getExplainString(fragments4, new ExplainOptions(false, false));
|
||||
String plan4 = planner4.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(3, StringUtils.countMatches(plan4, "INTERSECT"));
|
||||
|
||||
// except
|
||||
@ -176,8 +172,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor5 = new StmtExecutor(connectContext, sql5);
|
||||
stmtExecutor5.execute();
|
||||
Planner planner5 = stmtExecutor5.planner();
|
||||
List<PlanFragment> fragments5 = planner5.getFragments();
|
||||
String plan5 = planner5.getExplainString(fragments5, new ExplainOptions(false, false));
|
||||
String plan5 = planner5.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan5, "EXCEPT"));
|
||||
|
||||
String sql6 = "select * from db1.tbl1 where k1='a' and k4=1\n"
|
||||
@ -191,8 +186,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor6 = new StmtExecutor(connectContext, sql6);
|
||||
stmtExecutor6.execute();
|
||||
Planner planner6 = stmtExecutor6.planner();
|
||||
List<PlanFragment> fragments6 = planner6.getFragments();
|
||||
String plan6 = planner6.getExplainString(fragments6, new ExplainOptions(false, false));
|
||||
String plan6 = planner6.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan6, "EXCEPT"));
|
||||
|
||||
String sql7 = "select * from db1.tbl1 where k1='a' and k4=1\n"
|
||||
@ -206,8 +200,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor7 = new StmtExecutor(connectContext, sql7);
|
||||
stmtExecutor7.execute();
|
||||
Planner planner7 = stmtExecutor7.planner();
|
||||
List<PlanFragment> fragments7 = planner7.getFragments();
|
||||
String plan7 = planner7.getExplainString(fragments7, new ExplainOptions(false, false));
|
||||
String plan7 = planner7.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan7, "EXCEPT"));
|
||||
|
||||
// mixed
|
||||
@ -222,8 +215,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor8 = new StmtExecutor(connectContext, sql8);
|
||||
stmtExecutor8.execute();
|
||||
Planner planner8 = stmtExecutor8.planner();
|
||||
List<PlanFragment> fragments8 = planner8.getFragments();
|
||||
String plan8 = planner8.getExplainString(fragments8, new ExplainOptions(false, false));
|
||||
String plan8 = planner8.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan8, "UNION"));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan8, "INTERSECT"));
|
||||
Assert.assertEquals(1, StringUtils.countMatches(plan8, "EXCEPT"));
|
||||
@ -251,8 +243,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor9 = new StmtExecutor(connectContext, sql9);
|
||||
stmtExecutor9.execute();
|
||||
Planner planner9 = stmtExecutor9.planner();
|
||||
List<PlanFragment> fragments9 = planner9.getFragments();
|
||||
String plan9 = planner9.getExplainString(fragments9, new ExplainOptions(false, false));
|
||||
String plan9 = planner9.getExplainString(new ExplainOptions(false, false));
|
||||
Assert.assertEquals(2, StringUtils.countMatches(plan9, "UNION"));
|
||||
Assert.assertEquals(3, StringUtils.countMatches(plan9, "INTERSECT"));
|
||||
Assert.assertEquals(2, StringUtils.countMatches(plan9, "EXCEPT"));
|
||||
@ -362,8 +353,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
|
||||
stmtExecutor1.execute();
|
||||
Planner planner1 = stmtExecutor1.planner();
|
||||
List<PlanFragment> fragments1 = planner1.getFragments();
|
||||
String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(true, false));
|
||||
String plan1 = planner1.getExplainString(new ExplainOptions(true, false));
|
||||
Assert.assertEquals(2, StringUtils.countMatches(plan1, "nullIndicatorBit=0"));
|
||||
}
|
||||
|
||||
@ -413,8 +403,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Planner planner1 = stmtExecutor1.planner();
|
||||
List<PlanFragment> fragments1 = planner1.getFragments();
|
||||
String plan1 = planner1.getExplainString(fragments1, new ExplainOptions(false, false));
|
||||
String plan1 = planner1.getExplainString(new ExplainOptions(false, false));
|
||||
|
||||
StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2);
|
||||
try {
|
||||
@ -423,8 +412,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Planner planner2 = stmtExecutor2.planner();
|
||||
List<PlanFragment> fragments2 = planner2.getFragments();
|
||||
String plan2 = planner2.getExplainString(fragments2, new ExplainOptions(false, false));
|
||||
String plan2 = planner2.getExplainString(new ExplainOptions(false, false));
|
||||
|
||||
Assert.assertEquals(plan1, plan2);
|
||||
};
|
||||
@ -459,8 +447,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n"));
|
||||
}
|
||||
|
||||
@ -471,8 +458,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assertions.assertFalse(plan.contains("PREDICATES:"));
|
||||
}
|
||||
|
||||
@ -483,8 +469,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1, `k2` = 1\n"));
|
||||
}
|
||||
|
||||
@ -496,8 +481,7 @@ public class PlannerTest extends TestWithFeService {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
|
||||
String plan = planner.getExplainString(new ExplainOptions(false, false));
|
||||
Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n"));
|
||||
}
|
||||
|
||||
|
||||
@ -33,10 +33,10 @@ import org.apache.doris.planner.DataPartition;
|
||||
import org.apache.doris.planner.EmptySetNode;
|
||||
import org.apache.doris.planner.HashJoinNode;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -63,13 +63,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class CoordinatorTest extends Coordinator {
|
||||
static Planner planner = new Planner();
|
||||
static ConnectContext context = new ConnectContext(null);
|
||||
|
||||
static {
|
||||
context.setQueryId(new TUniqueId(1, 2));
|
||||
context.setQualifiedUser("root");
|
||||
}
|
||||
|
||||
@Mocked
|
||||
static Catalog catalog;
|
||||
@ -77,17 +71,25 @@ public class CoordinatorTest extends Coordinator {
|
||||
static EditLog editLog;
|
||||
@Mocked
|
||||
static FrontendOptions frontendOptions;
|
||||
|
||||
static ConnectContext context = new ConnectContext(null);
|
||||
static Analyzer analyzer = new Analyzer(catalog, context);
|
||||
static OriginalPlanner originalPlanner = new OriginalPlanner(analyzer);
|
||||
|
||||
static {
|
||||
context.setQueryId(new TUniqueId(1, 2));
|
||||
context.setQualifiedUser("root");
|
||||
}
|
||||
|
||||
public CoordinatorTest() {
|
||||
super(context, analyzer, planner);
|
||||
super(context, analyzer, originalPlanner);
|
||||
}
|
||||
|
||||
private static Coordinator coor;
|
||||
|
||||
@Test
|
||||
public void testComputeColocateJoinInstanceParam() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner);
|
||||
|
||||
PlanFragmentId planFragmentId = new PlanFragmentId(1);
|
||||
int scanNodeId = 1;
|
||||
@ -279,7 +281,7 @@ public class CoordinatorTest extends Coordinator {
|
||||
|
||||
@Test
|
||||
public void testColocateJoinAssignment() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner);
|
||||
|
||||
PlanFragmentId planFragmentId = new PlanFragmentId(1);
|
||||
int scanNodeId = 1;
|
||||
@ -505,7 +507,7 @@ public class CoordinatorTest extends Coordinator {
|
||||
|
||||
@Test
|
||||
public void testComputeScanRangeAssignmentByScheduler() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner);
|
||||
PlanFragmentId planFragmentId = new PlanFragmentId(1);
|
||||
int scanNodeId = 1;
|
||||
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
|
||||
@ -589,7 +591,7 @@ public class CoordinatorTest extends Coordinator {
|
||||
|
||||
@Test
|
||||
public void testGetExecHostPortForFragmentIDAndBucketSeq() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner);
|
||||
PlanFragmentId planFragmentId = new PlanFragmentId(1);
|
||||
// each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
|
||||
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
|
||||
@ -712,7 +714,7 @@ public class CoordinatorTest extends Coordinator {
|
||||
|
||||
@Test
|
||||
public void testComputeScanRangeAssignment() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, originalPlanner);
|
||||
|
||||
//TScanRangeLocations
|
||||
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
|
||||
|
||||
@ -37,7 +37,7 @@ import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
import org.apache.doris.mysql.MysqlSerializer;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
@ -174,7 +174,7 @@ public class StmtExecutorTest {
|
||||
@Test
|
||||
public void testSelect(@Mocked QueryStmt queryStmt,
|
||||
@Mocked SqlParser parser,
|
||||
@Mocked Planner planner,
|
||||
@Mocked OriginalPlanner planner,
|
||||
@Mocked Coordinator coordinator) throws Exception {
|
||||
Catalog catalog = Catalog.getCurrentCatalog();
|
||||
Deencapsulation.setField(catalog, "canRead", new AtomicBoolean(true));
|
||||
@ -211,7 +211,7 @@ public class StmtExecutorTest {
|
||||
minTimes = 0;
|
||||
result = symbol;
|
||||
|
||||
planner.plan((QueryStmt) any, (Analyzer) any, (TQueryOptions) any);
|
||||
planner.plan((QueryStmt) any, (TQueryOptions) any);
|
||||
minTimes = 0;
|
||||
|
||||
// mock coordinator
|
||||
|
||||
@ -204,7 +204,7 @@ public class DorisAssert {
|
||||
}
|
||||
}
|
||||
Planner planner = stmtExecutor.planner();
|
||||
String explainString = planner.getExplainString(planner.getFragments(), new ExplainOptions(false, false));
|
||||
String explainString = planner.getExplainString(new ExplainOptions(false, false));
|
||||
System.out.println(explainString);
|
||||
return explainString;
|
||||
}
|
||||
|
||||
@ -319,7 +319,7 @@ public abstract class TestWithFeService {
|
||||
stmtExecutor.execute();
|
||||
if (connectContext.getState().getStateType() != QueryState.MysqlStateType.ERR) {
|
||||
Planner planner = stmtExecutor.planner();
|
||||
return planner.getExplainString(planner.getFragments(), new ExplainOptions(isVerbose, false));
|
||||
return planner.getExplainString(new ExplainOptions(isVerbose, false));
|
||||
} else {
|
||||
return connectContext.getState().getErrorMessage();
|
||||
}
|
||||
|
||||
@ -282,7 +282,7 @@ public class UtFrameUtils {
|
||||
stmtExecutor.execute();
|
||||
if (ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
|
||||
Planner planner = stmtExecutor.planner();
|
||||
return planner.getExplainString(planner.getFragments(), new ExplainOptions(isVerbose, false));
|
||||
return planner.getExplainString(new ExplainOptions(isVerbose, false));
|
||||
} else {
|
||||
return ctx.getState().getErrorMessage();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user