[Forbidden](Vec) Switch to non-vec engine when outer join + not null column (#8979)
* [Forbidden](Vec) Switch to non-vec engine when outer join + not null column Vectorized code will occur `core` in the case of ```outer join + not null column```, such as issue #7901 So we need to fall back from vectorized mode to non-vectorized mode when we encounter this situation. If the nullside column of the outer join is a column that must return non-null like count(*) then there is no way to force the column to be nullable. At this time, vectorization cannot support this situation, so it is necessary to fall back to non-vectorization for processing. For example: Query: set enable_vectorized_engine=true Query: select * from t1 left join (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1 Result: Query goes non-vectorized engine
This commit is contained in:
@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.IdGenerator;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.VecNotImplException;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.planner.PlanNode;
|
||||
import org.apache.doris.planner.RuntimeFilter;
|
||||
@ -297,6 +298,17 @@ public class Analyzer {
|
||||
|
||||
private final long autoBroadcastJoinThreshold;
|
||||
|
||||
/**
|
||||
* This property is mainly used to store the vectorized switch of the current query.
|
||||
* true: the vectorization of the current query is turned on
|
||||
* false: the vectorization of the current query is turned off.
|
||||
* It is different from the vectorized switch`enableVectorizedEngine` of the session.
|
||||
* It is only valid for a single query, while the session switch is valid for all queries in the session.
|
||||
* It cannot be set directly by the user, only by inheritance from session`enableVectorizedEngine`
|
||||
* or internal adjustment of the system.
|
||||
*/
|
||||
private boolean enableQueryVec;
|
||||
|
||||
public GlobalState(Catalog catalog, ConnectContext context) {
|
||||
this.catalog = catalog;
|
||||
this.context = context;
|
||||
@ -347,6 +359,9 @@ public class Analyzer {
|
||||
// autoBroadcastJoinThreshold is a "final" field, must set an initial value for it
|
||||
autoBroadcastJoinThreshold = 0;
|
||||
}
|
||||
if (context != null) {
|
||||
enableQueryVec = context.getSessionVariable().enableVectorizedEngine();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -643,9 +658,34 @@ public class Analyzer {
|
||||
return db.getTableOrAnalysisException(tblName.getTbl());
|
||||
}
|
||||
|
||||
public ExprRewriter getExprRewriter() { return globalState.exprRewriter_; }
|
||||
public ExprRewriter getExprRewriter() {
|
||||
return globalState.exprRewriter_;
|
||||
}
|
||||
|
||||
public ExprRewriter getMVExprRewriter() { return globalState.mvExprRewriter; }
|
||||
public ExprRewriter getMVExprRewriter() {
|
||||
return globalState.mvExprRewriter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only the top-level `query vec` value of the query analyzer represents the value of the entire query.
|
||||
* Other sub-analyzers cannot represent the value of `query vec`.
|
||||
* @return
|
||||
*/
|
||||
public boolean enableQueryVec() {
|
||||
if (ancestors.isEmpty()) {
|
||||
return globalState.enableQueryVec;
|
||||
} else {
|
||||
return ancestors.get(ancestors.size() - 1).enableQueryVec();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Since analyzer cannot get sub-analyzers from top to bottom.
|
||||
* So I can only set the `query vec` variable of the top level analyzer of query to true.
|
||||
*/
|
||||
public void disableQueryVec() {
|
||||
globalState.enableQueryVec = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return descriptor of registered table/alias.
|
||||
@ -890,18 +930,47 @@ public class Analyzer {
|
||||
}
|
||||
|
||||
/**
|
||||
* All tuple of outer join tuple should be null in slot desc
|
||||
* The main function of this method is to set the column property on the nullable side of the outer join
|
||||
* to nullable in the case of vectorization.
|
||||
* For example:
|
||||
* Query: select * from t1 left join t2 on t1.k1=t2.k1
|
||||
* Origin: t2.k1 not null
|
||||
* Result: t2.k1 is nullable
|
||||
*
|
||||
* @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable.
|
||||
* It will report an error and fall back from vectorized mode to non-vectorized mode for execution.
|
||||
* If the nullside column of the outer join is a column that must return non-null like count(*)
|
||||
* then there is no way to force the column to be nullable.
|
||||
* At this time, vectorization cannot support this situation,
|
||||
* so it is necessary to fall back to non-vectorization for processing.
|
||||
* For example:
|
||||
* Query: select * from t1 left join (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1
|
||||
* Origin: tmp.k1 not null, tmp.count_k2 not null
|
||||
* Result: throw VecNotImplException
|
||||
*/
|
||||
public void changeAllOuterJoinTupleToNull() {
|
||||
public void changeAllOuterJoinTupleToNull() throws VecNotImplException {
|
||||
for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) {
|
||||
for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
|
||||
slotDescriptor.setIsNullable(true);
|
||||
changeSlotToNull(slotDescriptor);
|
||||
}
|
||||
}
|
||||
|
||||
for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) {
|
||||
for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
|
||||
slotDescriptor.setIsNullable(true);
|
||||
changeSlotToNull(slotDescriptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException {
|
||||
if (slotDescriptor.getSourceExprs().isEmpty()) {
|
||||
slotDescriptor.setIsNullable(true);
|
||||
return;
|
||||
}
|
||||
for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
|
||||
if (!sourceExpr.isNullable()) {
|
||||
throw new VecNotImplException("The slot (" + slotDescriptor.toString()
|
||||
+ ") could not be changed to nullable");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.common.TableAliasGenerator;
|
||||
import org.apache.doris.common.TreeNode;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.SqlUtils;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
@ -514,7 +515,9 @@ public class SelectStmt extends QueryStmt {
|
||||
// Change all outer join tuple to null here after analyze where and from clause
|
||||
// all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info
|
||||
// the solt desc nullable mark must be corrected to make sure BE exec query right.
|
||||
analyzer.changeAllOuterJoinTupleToNull();
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
analyzer.changeAllOuterJoinTupleToNull();
|
||||
}
|
||||
|
||||
createSortInfo(analyzer);
|
||||
if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) {
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
public class VecNotImplException extends UserException {
|
||||
public VecNotImplException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
@ -17,14 +17,52 @@
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
public class VectorizedUtil {
|
||||
/**
|
||||
* 1. Return false if there is no current connection (Rule1 to be changed)
|
||||
* 2. Returns the vectorized switch value of the query 'globalState.enableQueryVec'
|
||||
* 3. If it is not currently a query, return the vectorized switch value of the session 'enableVectorizedEngine'
|
||||
* @return true: vec. false: non-vec
|
||||
*/
|
||||
public static boolean isVectorized() {
|
||||
if (ConnectContext.get() == null) {
|
||||
ConnectContext connectContext = ConnectContext.get();
|
||||
if (connectContext == null) {
|
||||
return false;
|
||||
}
|
||||
return ConnectContext.get().getSessionVariable().enableVectorizedEngine();
|
||||
Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
|
||||
if (analyzer == null) {
|
||||
return connectContext.getSessionVariable().enableVectorizedEngine();
|
||||
}
|
||||
return analyzer.enableQueryVec();
|
||||
}
|
||||
|
||||
/**
|
||||
* The purpose of this function is to turn off the vectorization switch for the current query.
|
||||
* When the vectorization engine cannot meet the requirements of the current query,
|
||||
* it will convert the current query into a non-vectorized query.
|
||||
* Note that this will only change the **vectorization switch for a single query**,
|
||||
* and will not affect other queries in the same session.
|
||||
* Therefore, even if the vectorization switch of the current query is turned off,
|
||||
* the vectorization properties of subsequent queries will not be affected.
|
||||
*
|
||||
* Session: set enable_vectorized_engine=true;
|
||||
* Query1: select * from table (vec)
|
||||
* Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec)
|
||||
* Query3: select * from table (still vec)
|
||||
*/
|
||||
public static void switchToQueryNonVec() {
|
||||
ConnectContext connectContext = ConnectContext.get();
|
||||
if (connectContext == null) {
|
||||
return;
|
||||
}
|
||||
Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
|
||||
if (analyzer == null) {
|
||||
return;
|
||||
}
|
||||
analyzer.disableQueryVec();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.common.util.ListUtil;
|
||||
import org.apache.doris.common.util.ProfileWriter;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.load.LoadErrorHub;
|
||||
import org.apache.doris.load.loadv2.LoadJob;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
@ -230,7 +231,7 @@ public class Coordinator {
|
||||
this.scanNodes = planner.getScanNodes();
|
||||
this.descTable = analyzer.getDescTbl().toThrift();
|
||||
this.returnedAllResults = false;
|
||||
this.queryOptions = context.getSessionVariable().toThrift();
|
||||
initQueryOptions(context);
|
||||
|
||||
setFromUserProperty(analyzer);
|
||||
|
||||
@ -300,6 +301,11 @@ public class Coordinator {
|
||||
}
|
||||
}
|
||||
|
||||
private void initQueryOptions(ConnectContext context) {
|
||||
this.queryOptions = context.getSessionVariable().toThrift();
|
||||
this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized());
|
||||
}
|
||||
|
||||
public long getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
@ -66,6 +66,7 @@ import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.VecNotImplException;
|
||||
import org.apache.doris.common.Version;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
@ -75,6 +76,7 @@ import org.apache.doris.common.util.QueryPlannerProfile;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
@ -117,7 +119,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -137,6 +138,8 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
// Do one COM_QUERY process.
|
||||
// first: Parse receive byte array to statement struct.
|
||||
// second: Do handle function for statement.
|
||||
@ -194,6 +197,10 @@ public class StmtExecutor implements ProfileWriter {
|
||||
this.coord = coord;
|
||||
}
|
||||
|
||||
public Analyzer getAnalyzer() {
|
||||
return analyzer;
|
||||
}
|
||||
|
||||
// At the end of query execution, we begin to add up profile
|
||||
private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) {
|
||||
long currentTimestamp = System.currentTimeMillis();
|
||||
@ -566,24 +573,38 @@ public class StmtExecutor implements ProfileWriter {
|
||||
}
|
||||
// table id in tableList is in ascending order because that table map is a sorted map
|
||||
List<Table> tables = Lists.newArrayList(tableMap.values());
|
||||
MetaLockUtils.readLockTables(tables);
|
||||
try {
|
||||
analyzeAndGenerateQueryPlan(tQueryOptions);
|
||||
} catch (MVSelectFailedException e) {
|
||||
/**
|
||||
* If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query.
|
||||
* So, the query should be reanalyzed without mv rewritten and planner again.
|
||||
* Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time.
|
||||
*/
|
||||
resetAnalyzerAndStmt();
|
||||
analyzeAndGenerateQueryPlan(tQueryOptions);
|
||||
} catch (UserException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
|
||||
throw new AnalysisException("Unexpected exception: " + e.getMessage());
|
||||
} finally {
|
||||
MetaLockUtils.readUnlockTables(tables);
|
||||
int analyzeTimes = 2;
|
||||
for (int i = 1; i <= analyzeTimes; i++) {
|
||||
MetaLockUtils.readLockTables(tables);
|
||||
try {
|
||||
analyzeAndGenerateQueryPlan(tQueryOptions);
|
||||
break;
|
||||
} catch (MVSelectFailedException e) {
|
||||
/**
|
||||
* If there is MVSelectFailedException after the first planner, there will be error mv rewritten in query.
|
||||
* So, the query should be reanalyzed without mv rewritten and planner again.
|
||||
* Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time.
|
||||
*/
|
||||
if (i == analyzeTimes) {
|
||||
throw e;
|
||||
} else {
|
||||
resetAnalyzerAndStmt();
|
||||
}
|
||||
} catch (VecNotImplException e) {
|
||||
if (i == analyzeTimes) {
|
||||
throw e;
|
||||
} else {
|
||||
resetAnalyzerAndStmt();
|
||||
VectorizedUtil.switchToQueryNonVec();
|
||||
}
|
||||
} catch (UserException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
|
||||
throw new AnalysisException("Unexpected exception: " + e.getMessage());
|
||||
} finally {
|
||||
MetaLockUtils.readUnlockTables(tables);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user