diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 693a90d958..4be8fa5308 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RuntimeFilter; @@ -297,6 +298,17 @@ public class Analyzer { private final long autoBroadcastJoinThreshold; + /** + * This property is mainly used to store the vectorized switch of the current query. + * true: the vectorization of the current query is turned on + * false: the vectorization of the current query is turned off. + * It is different from the vectorized switch`enableVectorizedEngine` of the session. + * It is only valid for a single query, while the session switch is valid for all queries in the session. + * It cannot be set directly by the user, only by inheritance from session`enableVectorizedEngine` + * or internal adjustment of the system. + */ + private boolean enableQueryVec; + public GlobalState(Catalog catalog, ConnectContext context) { this.catalog = catalog; this.context = context; @@ -347,6 +359,9 @@ public class Analyzer { // autoBroadcastJoinThreshold is a "final" field, must set an initial value for it autoBroadcastJoinThreshold = 0; } + if (context != null) { + enableQueryVec = context.getSessionVariable().enableVectorizedEngine(); + } } } @@ -643,9 +658,34 @@ public class Analyzer { return db.getTableOrAnalysisException(tblName.getTbl()); } - public ExprRewriter getExprRewriter() { return globalState.exprRewriter_; } + public ExprRewriter getExprRewriter() { + return globalState.exprRewriter_; + } - public ExprRewriter getMVExprRewriter() { return globalState.mvExprRewriter; } + public ExprRewriter getMVExprRewriter() { + return globalState.mvExprRewriter; + } + + /** + * Only the top-level `query vec` value of the query analyzer represents the value of the entire query. + * Other sub-analyzers cannot represent the value of `query vec`. + * @return + */ + public boolean enableQueryVec() { + if (ancestors.isEmpty()) { + return globalState.enableQueryVec; + } else { + return ancestors.get(ancestors.size() - 1).enableQueryVec(); + } + } + + /** + * Since analyzer cannot get sub-analyzers from top to bottom. + * So I can only set the `query vec` variable of the top level analyzer of query to true. + */ + public void disableQueryVec() { + globalState.enableQueryVec = false; + } /** * Return descriptor of registered table/alias. @@ -890,18 +930,47 @@ public class Analyzer { } /** - * All tuple of outer join tuple should be null in slot desc + * The main function of this method is to set the column property on the nullable side of the outer join + * to nullable in the case of vectorization. + * For example: + * Query: select * from t1 left join t2 on t1.k1=t2.k1 + * Origin: t2.k1 not null + * Result: t2.k1 is nullable + * + * @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable. + * It will report an error and fall back from vectorized mode to non-vectorized mode for execution. + * If the nullside column of the outer join is a column that must return non-null like count(*) + * then there is no way to force the column to be nullable. + * At this time, vectorization cannot support this situation, + * so it is necessary to fall back to non-vectorization for processing. + * For example: + * Query: select * from t1 left join (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1 + * Origin: tmp.k1 not null, tmp.count_k2 not null + * Result: throw VecNotImplException */ - public void changeAllOuterJoinTupleToNull() { + public void changeAllOuterJoinTupleToNull() throws VecNotImplException { for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) { for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { - slotDescriptor.setIsNullable(true); + changeSlotToNull(slotDescriptor); } } for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) { for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { - slotDescriptor.setIsNullable(true); + changeSlotToNull(slotDescriptor); + } + } + } + + private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException { + if (slotDescriptor.getSourceExprs().isEmpty()) { + slotDescriptor.setIsNullable(true); + return; + } + for (Expr sourceExpr : slotDescriptor.getSourceExprs()) { + if (!sourceExpr.isNullable()) { + throw new VecNotImplException("The slot (" + slotDescriptor.toString() + + ") could not be changed to nullable"); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 376fefb781..502fae51aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -37,6 +37,7 @@ import org.apache.doris.common.TableAliasGenerator; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlUtils; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; @@ -514,7 +515,9 @@ public class SelectStmt extends QueryStmt { // Change all outer join tuple to null here after analyze where and from clause // all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info // the solt desc nullable mark must be corrected to make sure BE exec query right. - analyzer.changeAllOuterJoinTupleToNull(); + if (VectorizedUtil.isVectorized()) { + analyzer.changeAllOuterJoinTupleToNull(); + } createSortInfo(analyzer); if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java new file mode 100644 index 0000000000..2c5d12e7d8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +public class VecNotImplException extends UserException { + public VecNotImplException(String msg) { + super(msg); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java index 349cf1e236..b094389db9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java @@ -17,14 +17,52 @@ package org.apache.doris.common.util; +import org.apache.doris.analysis.Analyzer; import org.apache.doris.qe.ConnectContext; public class VectorizedUtil { + /** + * 1. Return false if there is no current connection (Rule1 to be changed) + * 2. Returns the vectorized switch value of the query 'globalState.enableQueryVec' + * 3. If it is not currently a query, return the vectorized switch value of the session 'enableVectorizedEngine' + * @return true: vec. false: non-vec + */ public static boolean isVectorized() { - if (ConnectContext.get() == null) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { return false; } - return ConnectContext.get().getSessionVariable().enableVectorizedEngine(); + Analyzer analyzer = connectContext.getExecutor().getAnalyzer(); + if (analyzer == null) { + return connectContext.getSessionVariable().enableVectorizedEngine(); + } + return analyzer.enableQueryVec(); + } + + /** + * The purpose of this function is to turn off the vectorization switch for the current query. + * When the vectorization engine cannot meet the requirements of the current query, + * it will convert the current query into a non-vectorized query. + * Note that this will only change the **vectorization switch for a single query**, + * and will not affect other queries in the same session. + * Therefore, even if the vectorization switch of the current query is turned off, + * the vectorization properties of subsequent queries will not be affected. + * + * Session: set enable_vectorized_engine=true; + * Query1: select * from table (vec) + * Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec) + * Query3: select * from table (still vec) + */ + public static void switchToQueryNonVec() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return; + } + Analyzer analyzer = connectContext.getExecutor().getAnalyzer(); + if (analyzer == null) { + return; + } + analyzer.disableQueryVec(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d2f6e28b92..458dd99e5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -33,6 +33,7 @@ import org.apache.doris.common.util.ListUtil; import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.planner.DataPartition; @@ -230,7 +231,7 @@ public class Coordinator { this.scanNodes = planner.getScanNodes(); this.descTable = analyzer.getDescTbl().toThrift(); this.returnedAllResults = false; - this.queryOptions = context.getSessionVariable().toThrift(); + initQueryOptions(context); setFromUserProperty(analyzer); @@ -300,6 +301,11 @@ public class Coordinator { } } + private void initQueryOptions(ConnectContext context) { + this.queryOptions = context.getSessionVariable().toThrift(); + this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized()); + } + public long getJobId() { return jobId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 702c3db6c8..2de84f0705 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -66,6 +66,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; @@ -75,6 +76,7 @@ import org.apache.doris.common.util.QueryPlannerProfile; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.EtlJobType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -117,7 +119,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -137,6 +138,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import com.google.protobuf.ByteString; + // Do one COM_QUERY process. // first: Parse receive byte array to statement struct. // second: Do handle function for statement. @@ -194,6 +197,10 @@ public class StmtExecutor implements ProfileWriter { this.coord = coord; } + public Analyzer getAnalyzer() { + return analyzer; + } + // At the end of query execution, we begin to add up profile private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { long currentTimestamp = System.currentTimeMillis(); @@ -566,24 +573,38 @@ public class StmtExecutor implements ProfileWriter { } // table id in tableList is in ascending order because that table map is a sorted map List tables = Lists.newArrayList(tableMap.values()); - MetaLockUtils.readLockTables(tables); - try { - analyzeAndGenerateQueryPlan(tQueryOptions); - } catch (MVSelectFailedException e) { - /** - * If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query. - * So, the query should be reanalyzed without mv rewritten and planner again. - * Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time. - */ - resetAnalyzerAndStmt(); - analyzeAndGenerateQueryPlan(tQueryOptions); - } catch (UserException e) { - throw e; - } catch (Exception e) { - LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); - throw new AnalysisException("Unexpected exception: " + e.getMessage()); - } finally { - MetaLockUtils.readUnlockTables(tables); + int analyzeTimes = 2; + for (int i = 1; i <= analyzeTimes; i++) { + MetaLockUtils.readLockTables(tables); + try { + analyzeAndGenerateQueryPlan(tQueryOptions); + break; + } catch (MVSelectFailedException e) { + /** + * If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query. + * So, the query should be reanalyzed without mv rewritten and planner again. + * Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time. + */ + if (i == analyzeTimes) { + throw e; + } else { + resetAnalyzerAndStmt(); + } + } catch (VecNotImplException e) { + if (i == analyzeTimes) { + throw e; + } else { + resetAnalyzerAndStmt(); + VectorizedUtil.switchToQueryNonVec(); + } + } catch (UserException e) { + throw e; + } catch (Exception e) { + LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); + throw new AnalysisException("Unexpected exception: " + e.getMessage()); + } finally { + MetaLockUtils.readUnlockTables(tables); + } } } else { try {