diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 21c950c40c..1ae1864ad3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -107,6 +107,7 @@ public class NereidsPlanner extends Planner { @Override public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions queryOptions) { + this.queryOptions = queryOptions; if (statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) { NereidsTracer.init(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index deb7f5f76c..4d026cf9ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.ShortCircuitQueryContext; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.statistics.Statistics; @@ -122,8 +123,8 @@ public class StatementContext implements Closeable { // generate for next id for prepared statement's placeholders, which is connection level private final IdGenerator placeHolderIdGenerator = PlaceholderId.createGenerator(); - // relation id to placeholders for prepared statement - private final Map idToPlaceholderRealExpr = new HashMap<>(); + // relation id to placeholders for prepared statement, ordered by placeholder id + private final Map idToPlaceholderRealExpr = new TreeMap<>(); // collect all hash join conditions to compute node connectivity in join graph private final List joinFilters = new ArrayList<>(); @@ -164,6 +165,12 @@ public class StatementContext implements Closeable { // form this map private final Map relationIdToStatisticsMap = new LinkedHashMap<>(); + // Indicates the query is short-circuited in both plan and execution phase, typically + // for high speed/concurrency point queries + private boolean isShortCircuitQuery; + + private ShortCircuitQueryContext shortCircuitQueryContext; + public StatementContext() { this(ConnectContext.get(), null, 0); } @@ -235,6 +242,22 @@ public class StatementContext implements Closeable { } } + public boolean isShortCircuitQuery() { + return isShortCircuitQuery; + } + + public void setShortCircuitQuery(boolean shortCircuitQuery) { + isShortCircuitQuery = shortCircuitQuery; + } + + public ShortCircuitQueryContext getShortCircuitQueryContext() { + return shortCircuitQueryContext; + } + + public void setShortCircuitQueryContext(ShortCircuitQueryContext shortCircuitQueryContext) { + this.shortCircuitQueryContext = shortCircuitQueryContext; + } + public Optional getSqlCacheContext() { return Optional.ofNullable(sqlCacheContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 28307ec7b3..a240bebd90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -82,6 +82,7 @@ import org.apache.doris.nereids.rules.rewrite.InferPredicates; import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct; import org.apache.doris.nereids.rules.rewrite.InlineLogicalView; import org.apache.doris.nereids.rules.rewrite.LimitSortToTopN; +import org.apache.doris.nereids.rules.rewrite.LogicalResultSinkToShortCircuitPointQuery; import org.apache.doris.nereids.rules.rewrite.MergeAggregate; import org.apache.doris.nereids.rules.rewrite.MergeFilters; import org.apache.doris.nereids.rules.rewrite.MergeOneRowRelationIntoUnion; @@ -398,6 +399,8 @@ public class Rewriter extends AbstractBatchJobExecutor { topic("topn optimize", topDown(new DeferMaterializeTopNResult()) ), + topic("Point query short circuit", + topDown(new LogicalResultSinkToShortCircuitPointQuery())), topic("eliminate", // SORT_PRUNING should be applied after mergeLimit custom(RuleType.ELIMINATE_SORT, EliminateSort::new), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 864d8fd6bd..721f9f8dff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -340,6 +340,8 @@ public enum RuleType { BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_PROJECT_SCAN(RuleTypeClass.REWRITE), BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_FILTER_SCAN(RuleTypeClass.REWRITE), BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_AGG_SCAN(RuleTypeClass.REWRITE), + // short circuit rule + SHOR_CIRCUIT_POINT_QUERY(RuleTypeClass.REWRITE), // exploration rules REORDER_INTERSECT(RuleTypeClass.EXPLORATION), TEST_EXPLORATION(RuleTypeClass.EXPLORATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java new file mode 100644 index 0000000000..1438edb9bd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Set; + +/** + * short circuit query optimization + * pattern : select xxx from tbl where key = ? + */ +public class LogicalResultSinkToShortCircuitPointQuery implements RewriteRuleFactory { + + private Expression removeCast(Expression expression) { + if (expression instanceof Cast) { + return expression.child(0); + } + return expression; + } + + private boolean filterMatchShortCircuitCondition(LogicalFilter filter) { + return filter.getConjuncts().stream().allMatch( + // all conjuncts match with pattern `key = ?` + expression -> (expression instanceof EqualTo) + && (removeCast(expression.child(0)).isKeyColumnFromTable() + || ((SlotReference) expression.child(0)).getName().equals(Column.DELETE_SIGN)) + && expression.child(1).isLiteral()); + } + + private boolean scanMatchShortCircuitCondition(LogicalOlapScan olapScan) { + if (!ConnectContext.get().getSessionVariable().enableShortCircuitQuery) { + return false; + } + OlapTable olapTable = olapScan.getTable(); + return olapTable.getEnableLightSchemaChange() && olapTable.getEnableUniqueKeyMergeOnWrite() + && olapTable.storeRowColumn(); + } + + // set short circuit flag and return the original plan + private Plan shortCircuit(Plan root, OlapTable olapTable, + Set conjuncts, StatementContext statementContext) { + // All key columns in conjuncts + Set colNames = Sets.newHashSet(); + for (Expression expr : conjuncts) { + colNames.add(((SlotReference) removeCast((expr.child(0)))).getName()); + } + // set short circuit flag and modify nothing to the plan + if (olapTable.getBaseSchemaKeyColumns().size() <= colNames.size()) { + statementContext.setShortCircuitQuery(true); + } + return root; + } + + @Override + public List buildRules() { + return ImmutableList.of( + RuleType.SHOR_CIRCUIT_POINT_QUERY.build( + logicalResultSink(logicalProject(logicalFilter(logicalOlapScan() + .when(this::scanMatchShortCircuitCondition) + ).when(this::filterMatchShortCircuitCondition))) + .thenApply(ctx -> { + return shortCircuit(ctx.root, ctx.root.child().child().child().getTable(), + + ctx.root.child().child().getConjuncts(), ctx.statementContext); + })), + RuleType.SHOR_CIRCUIT_POINT_QUERY.build( + logicalResultSink(logicalFilter(logicalOlapScan() + .when(this::scanMatchShortCircuitCondition) + ).when(this::filterMatchShortCircuitCondition)) + .thenApply(ctx -> { + return shortCircuit(ctx.root, ctx.root.child().child().getTable(), + ctx.root.child().getConjuncts(), ctx.statementContext); + })) + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java index 75cef0fc94..f6c7cbdb66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java @@ -390,6 +390,11 @@ public abstract class Expression extends AbstractTreeNode implements return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent(); } + public boolean isKeyColumnFromTable() { + return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent() + && ((SlotReference) this).getColumn().get().isKey(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java index 9c1096799b..d81206f710 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java @@ -539,7 +539,7 @@ public abstract class Literal extends Expression implements LeafExpression, Comp microsecond = data.getInt(); } if (Config.enable_date_conversion) { - return new DateTimeV2Literal(year, month, day, hour, minute, second, microsecond); + return new DateTimeV2Literal(DateTimeV2Type.MAX, year, month, day, hour, minute, second, microsecond); } return new DateTimeLiteral(DateTimeType.INSTANCE, year, month, day, hour, minute, second, microsecond); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java index f1d410100e..be3cb645fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java @@ -23,7 +23,7 @@ import org.apache.doris.common.IdGenerator; /** * placeholder id for prepared statement parameters */ -public class PlaceholderId extends Id { +public class PlaceholderId extends Id implements Comparable { public PlaceholderId(int id) { super(id); @@ -55,4 +55,9 @@ public class PlaceholderId extends Id { public int hashCode() { return super.hashCode(); } + + @Override + public int compareTo(PlaceholderId o) { + return this.id - o.id; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java index b098f88364..d5260a72cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.Queriable; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -24,10 +25,13 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.PointQueryExecutor; import org.apache.doris.qe.PreparedStatementContext; +import org.apache.doris.qe.ShortCircuitQueryContext; import org.apache.doris.qe.StmtExecutor; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -65,8 +69,26 @@ public class ExecuteCommand extends Command { LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(prepareCommand.getLogicalPlan(), executor.getContext() .getStatementContext()); executor.setParsedStmt(planAdapter); - // execute real statement - executor.execute(); + // If it's not a short circuit query or schema version is different(indicates schema changed), + // need to do reanalyze and plan + boolean needAnalyze = !executor.getContext().getStatementContext().isShortCircuitQuery() + || (preparedStmtCtx.shortCircuitQueryContext.isPresent() + && preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion() + != preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion); + if (needAnalyze) { + // execute real statement + preparedStmtCtx.shortCircuitQueryContext = Optional.empty(); + statementContext.setShortCircuitQueryContext(null); + executor.execute(); + if (executor.getContext().getStatementContext().isShortCircuitQuery()) { + // cache short-circuit plan + preparedStmtCtx.shortCircuitQueryContext = Optional.of( + new ShortCircuitQueryContext(executor.planner(), (Queriable) executor.getParsedStmt())); + statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get()); + } + return; + } + PointQueryExecutor.directExecuteShortCircuitQuery(executor, preparedStmtCtx, statementContext); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 4d86a89f27..d312992bc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1122,7 +1122,8 @@ public class OlapScanNode extends ScanNode { public boolean isPointQuery() { return this.pointQueryEqualPredicats != null - || (preparedStatment != null && preparedStatment.isPointQueryShortCircuit()); + || (preparedStatment != null && preparedStatment.isPointQueryShortCircuit()) + || ConnectContext.get().getStatementContext().isShortCircuitQuery(); } private void computeTabletInfo() throws UserException { @@ -1250,6 +1251,7 @@ public class OlapScanNode extends ScanNode { scanTabletIds.clear(); bucketSeq2locations.clear(); scanReplicaIds.clear(); + sampleTabletIds.clear(); try { createScanRangeLocations(); } catch (AnalysisException e) { @@ -1350,7 +1352,7 @@ public class OlapScanNode extends ScanNode { output.append(prefix).append("pushAggOp=").append(pushDownAggNoGroupingOp).append("\n"); } if (isPointQuery()) { - output.append(prefix).append("SHORT-CIRCUIT"); + output.append(prefix).append("SHORT-CIRCUIT\n"); } if (!CollectionUtils.isEmpty(rewrittenProjectList)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java new file mode 100644 index 0000000000..3f5c24f563 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.KeyTuple; +import org.apache.doris.proto.Types; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.rpc.TCustomProtocolFactory; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class PointQueryExecutor implements CoordInterface { + private static final Logger LOG = LogManager.getLogger(PointQueryExecutor.class); + private long tabletID = 0; + private long timeoutMs = Config.point_query_timeout_ms; // default 10s + + private boolean isCancel = false; + private List candidateBackends; + private final int maxMsgSizeOfResultReceiver; + + // used for snapshot read in cloud mode + private List snapshotVisibleVersions; + + private final ShortCircuitQueryContext shortCircuitQueryContext; + + public PointQueryExecutor(ShortCircuitQueryContext ctx, int maxMessageSize) { + ctx.sanitize(); + this.shortCircuitQueryContext = ctx; + this.maxMsgSizeOfResultReceiver = maxMessageSize; + } + + void setScanRangeLocations() throws Exception { + OlapScanNode scanNode = shortCircuitQueryContext.scanNode; + // compute scan range + List locations = scanNode.lazyEvaluateRangeLocations(); + Preconditions.checkNotNull(locations); + if (scanNode.getScanTabletIds().isEmpty()) { + return; + } + Preconditions.checkState(scanNode.getScanTabletIds().size() == 1); + this.tabletID = scanNode.getScanTabletIds().get(0); + + candidateBackends = new ArrayList<>(); + for (Long backendID : scanNode.getScanBackendIds()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(backendID); + if (SimpleScheduler.isAvailable(backend)) { + candidateBackends.add(backend); + } + } + // Random read replicas + Collections.shuffle(this.candidateBackends); + if (LOG.isDebugEnabled()) { + LOG.debug("set scan locations, backend ids {}, tablet id {}", candidateBackends, tabletID); + } + } + + // execute query without analyze & plan + public static void directExecuteShortCircuitQuery(StmtExecutor executor, + PreparedStatementContext preparedStmtCtx, + StatementContext statementContext) throws Exception { + Preconditions.checkNotNull(preparedStmtCtx.shortCircuitQueryContext); + ShortCircuitQueryContext shortCircuitQueryContext = preparedStmtCtx.shortCircuitQueryContext.get(); + // update conjuncts + List conjunctVals = statementContext.getIdToPlaceholderRealExpr().values().stream().map( + expression -> ( + (Literal) expression).toLegacyLiteral()) + .collect(Collectors.toList()); + if (conjunctVals.size() != preparedStmtCtx.command.placeholderCount()) { + throw new AnalysisException("Mismatched conjuncts values size with prepared" + + "statement parameters size, expected " + + preparedStmtCtx.command.placeholderCount() + + ", but meet " + conjunctVals.size()); + } + updateScanNodeConjuncts(shortCircuitQueryContext.scanNode, conjunctVals); + // short circuit plan and execution + executor.executeAndSendResult(false, false, + shortCircuitQueryContext.analzyedQuery, executor.getContext() + .getMysqlChannel(), null, null); + } + + private static void updateScanNodeConjuncts(OlapScanNode scanNode, List conjunctVals) { + for (int i = 0; i < conjunctVals.size(); ++i) { + BinaryPredicate binaryPredicate = (BinaryPredicate) scanNode.getConjuncts().get(i); + if (binaryPredicate.getChild(0) instanceof LiteralExpr) { + binaryPredicate.setChild(0, conjunctVals.get(i)); + } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) { + binaryPredicate.setChild(1, conjunctVals.get(i)); + } else { + Preconditions.checkState(false, "Should conatains literal in " + binaryPredicate.toSqlImpl()); + } + } + } + + public void setTimeout(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + void addKeyTuples( + InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { + // TODO handle IN predicates + KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); + for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) { + BinaryPredicate predicate = (BinaryPredicate) expr; + kBuilder.addKeyColumnRep(predicate.getChild(1).getStringValue()); + } + requestBuilder.addKeyTuples(kBuilder); + } + + @Override + public void cancel(Types.PPlanFragmentCancelReason cancelReason) { + // Do nothing + } + + + @Override + public RowBatch getNext() throws Exception { + setScanRangeLocations(); + // No partition/tablet found return emtpy row batch + if (candidateBackends == null || candidateBackends.isEmpty()) { + return new RowBatch(); + } + Iterator backendIter = candidateBackends.iterator(); + RowBatch rowBatch = null; + int tryCount = 0; + int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size()); + Status status = new Status(); + do { + Backend backend = backendIter.next(); + rowBatch = getNextInternal(status, backend); + if (rowBatch != null) { + break; + } + if (++tryCount >= maxTry) { + break; + } + } while (true); + // handle status code + if (!status.ok()) { + if (Strings.isNullOrEmpty(status.getErrorMsg())) { + status.rewriteErrorMsg(); + } + String errMsg = status.getErrorMsg(); + LOG.warn("query failed: {}", errMsg); + if (status.isRpcError()) { + throw new RpcException(null, errMsg); + } else { + // hide host info + int hostIndex = errMsg.indexOf("host"); + if (hostIndex != -1) { + errMsg = errMsg.substring(0, hostIndex); + } + throw new UserException(errMsg); + } + } + return rowBatch; + } + + @Override + public void exec() throws Exception { + // Point queries don't need to do anthing in execution phase. + // only handles in getNext() + } + + private RowBatch getNextInternal(Status status, Backend backend) throws TException { + long timeoutTs = System.currentTimeMillis() + timeoutMs; + RowBatch rowBatch = new RowBatch(); + InternalService.PTabletKeyLookupResponse pResult = null; + try { + Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable); + + InternalService.PTabletKeyLookupRequest.Builder requestBuilder + = InternalService.PTabletKeyLookupRequest.newBuilder() + .setTabletId(tabletID) + .setDescTbl(shortCircuitQueryContext.serializedDescTable) + .setOutputExpr(shortCircuitQueryContext.serializedOutputExpr) + .setQueryOptions(shortCircuitQueryContext.serializedQueryOptions) + .setIsBinaryRow(ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE); + if (snapshotVisibleVersions != null && !snapshotVisibleVersions.isEmpty()) { + requestBuilder.setVersion(snapshotVisibleVersions.get(0)); + } + if (shortCircuitQueryContext.cacheID != null) { + InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder(); + uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits()); + uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits()); + requestBuilder.setUuid(uuidBuilder); + } + addKeyTuples(requestBuilder); + + InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); + Future futureResponse = + BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request); + long currentTs = System.currentTimeMillis(); + if (currentTs >= timeoutTs) { + LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request timeout"); + return null; + } + try { + pResult = futureResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // continue to get result + LOG.warn("future get interrupted Exception"); + if (isCancel) { + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); + return null; + } + } catch (TimeoutException e) { + futureResponse.cancel(true); + LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch result timeout"); + return null; + } + } catch (RpcException e) { + LOG.warn("query fetch rpc exception {}, e {}", backend.getBrpcAddress(), e); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); + return null; + } catch (ExecutionException e) { + LOG.warn("query fetch execution exception {}, addr {}", e, backend.getBrpcAddress()); + if (e.getMessage().contains("time out")) { + // if timeout, we set error code to TIMEOUT, and it will not retry querying. + status.updateStatus(TStatusCode.TIMEOUT, e.getMessage()); + } else { + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); + } + return null; + } + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); + return null; + } + + if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) { + LOG.debug("get empty rowbatch"); + rowBatch.setEos(true); + status.updateStatus(TStatusCode.OK, ""); + return rowBatch; + } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { + byte[] serialResult = pResult.getRowBatch().toByteArray(); + TResultBatch resultBatch = new TResultBatch(); + TDeserializer deserializer = new TDeserializer( + new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); + try { + deserializer.deserialize(resultBatch, serialResult); + } catch (TException e) { + if (e.getMessage().contains("MaxMessageSize reached")) { + throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); + } else { + throw e; + } + } + rowBatch.setBatch(resultBatch); + rowBatch.setEos(true); + status.updateStatus(TStatusCode.OK, ""); + return rowBatch; + } else { + Preconditions.checkState(false, "No row batch or empty batch found"); + } + + if (isCancel) { + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); + } + return rowBatch; + } + + public void cancel() { + isCancel = true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java index d54b2e5291..8decad7991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java @@ -20,11 +20,14 @@ package org.apache.doris.qe; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; +import java.util.Optional; + public class PreparedStatementContext { public PrepareCommand command; public ConnectContext ctx; StatementContext statementContext; public String stmtString; + public Optional shortCircuitQueryContext = Optional.empty(); // Timestamp in millisecond last command starts at protected volatile long startTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 471d7cd086..88a3a5c601 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -129,6 +129,7 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_INSTANCE_NUM = "max_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; + public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_point_query"; public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge"; public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = "enable_server_side_prepared_statement"; @@ -631,6 +632,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SPILLING) public boolean enableSpilling = false; + @VariableMgr.VarAttr(name = ENABLE_SHORT_CIRCUIT_QUERY) + public boolean enableShortCircuitQuery = true; + @VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE) public boolean enableExchangeNodeParallelMerge = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java new file mode 100644 index 0000000000..727eee1175 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.Queriable; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.Planner; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TExprList; +import org.apache.doris.thrift.TQueryOptions; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class ShortCircuitQueryContext { + // Cached for better CPU performance, since serialize DescriptorTable and + // outputExprs are heavy work + public final ByteString serializedDescTable; + public final ByteString serializedOutputExpr; + public final ByteString serializedQueryOptions; + + // For prepared statement cached structure, + // there are some pre-calculated structure in Backend TabletFetch service + // using this ID to find for this prepared statement + public final UUID cacheID; + + public final int schemaVersion; + public final OlapTable tbl; + + public final OlapScanNode scanNode; + public final Queriable analzyedQuery; + // Serialized mysql Field, this could avoid serialize mysql field each time sendFields. + // Since, serialize fields is too heavy when table is wide + public Map serializedFields = Maps.newHashMap(); + + + public ShortCircuitQueryContext(Planner planner, Queriable analzyedQuery) throws TException { + this.serializedDescTable = ByteString.copyFrom( + new TSerializer().serialize(planner.getDescTable().toThrift())); + TQueryOptions options = planner.getQueryOptions() != null ? planner.getQueryOptions() : new TQueryOptions(); + this.serializedQueryOptions = ByteString.copyFrom( + new TSerializer().serialize(options)); + List exprs = new ArrayList<>(); + for (Expr expr : planner.getFragments().get(1).getPlanRoot().getProjectList()) { + exprs.add(expr.treeToThrift()); + } + TExprList exprList = new TExprList(exprs); + serializedOutputExpr = ByteString.copyFrom( + new TSerializer().serialize(exprList)); + this.cacheID = UUID.randomUUID(); + this.scanNode = ((OlapScanNode) planner.getScanNodes().get(0)); + this.tbl = this.scanNode.getOlapTable(); + this.schemaVersion = this.tbl.getBaseSchemaVersion(); + this.analzyedQuery = analzyedQuery; + } + + public void sanitize() { + Preconditions.checkNotNull(serializedDescTable); + Preconditions.checkNotNull(serializedOutputExpr); + Preconditions.checkNotNull(cacheID); + Preconditions.checkNotNull(tbl); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index a0ba53f1fe..5bf75b99bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1622,7 +1622,7 @@ public class StmtExecutor { planner.plan(newSelectStmt, context.getSessionVariable().toThrift()); } } - sendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer, cacheResult); + executeAndSendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer, cacheResult); } // Process a select statement. @@ -1704,11 +1704,12 @@ public class StmtExecutor { } } - sendResult(isOutfileQuery, false, queryStmt, channel, null, null); + executeAndSendResult(isOutfileQuery, false, queryStmt, channel, null, null); LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); } - private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel, + public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, + Queriable queryStmt, MysqlChannel channel, CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception { // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, // We will not send real query result to client. Instead, we only send OK to client with @@ -1719,7 +1720,15 @@ public class StmtExecutor { // 2. If this is a query, send the result expr fields first, and send result data back to client. RowBatch batch; CoordInterface coordBase = null; - if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) { + if (statementContext.isShortCircuitQuery()) { + ShortCircuitQueryContext shortCircuitQueryContext = + statementContext.getShortCircuitQueryContext() != null + ? statementContext.getShortCircuitQueryContext() + : new ShortCircuitQueryContext(planner, (Queriable) parsedStmt); + coordBase = new PointQueryExecutor(shortCircuitQueryContext, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); + } else if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) { + // this branch is for legacy planner, to be removed coordBase = new PointQueryExec(planner, analyzer, context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); } else { diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out b/regression-test/data/point_query_p0/test_point_query_partition.out index cd22e6c93e..bef064984c 100644 --- a/regression-test/data/point_query_p0/test_point_query_partition.out +++ b/regression-test/data/point_query_p0/test_point_query_partition.out @@ -31,3 +31,18 @@ -- !point_select -- +-- !point_selectxxx -- +686612 686612 686612 \N \N \N \N \N \N \N \N + +-- !point_selectyyy -- +686612 686612 686612 \N \N \N \N \N \N \N \N + +-- !point_selectzzz -- +686612 686612 686612 \N \N \N \N \N \N \N \N + +-- !point_selectmmm -- +686613 686613 686613 \N \N \N \N \N \N \N \N + +-- !point_selecteee -- +686613 686613 686613 \N \N \N \N \N \N \N \N + diff --git a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy index e493d6b036..5c3011f388 100644 --- a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy +++ b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy @@ -18,7 +18,7 @@ import org.codehaus.groovy.runtime.IOGroovyMethods -suite("test_compaction_uniq_keys_row_store") { +suite("test_compaction_uniq_keys_row_store", "nonConcurrent") { def realDb = "regression_test_serving_p0" def tableName = realDb + ".compaction_uniq_keys_row_store_regression_test" sql "CREATE DATABASE IF NOT EXISTS ${realDb}" @@ -36,6 +36,8 @@ suite("test_compaction_uniq_keys_row_store") { stmt.setInt(8, sex) } + sql "set global enable_server_side_prepared_statement = true" + try { String backend_id; def backendId_to_backendIP = [:] @@ -76,7 +78,7 @@ suite("test_compaction_uniq_keys_row_store") { // set server side prepared statment url def url="jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + "?&useServerPrepStmts=true" def result1 = connect(user=user, password=password, url=url) { - def stmt = prepareStatement """ SELECT * FROM ${tableName} t where user_id = ? and date = ? and datev2 = ? and datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """ + def stmt = prepareStatement """ SELECT /*+ SET_VAR(enable_nereids_planner=true,enable_fallback_to_original_planner=false) */ * FROM ${tableName} t where user_id = ? and date = ? and datev2 = ? and datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """ setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.21', '2017-10-01 11:11:11.11', 'Beijing', 10, 1 qe_point_select stmt setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.22', '2017-10-01 11:11:11.12', 'Beijing', 10, 1 @@ -211,4 +213,5 @@ suite("test_compaction_uniq_keys_row_store") { } finally { // try_sql("DROP TABLE IF EXISTS ${tableName}") } + sql "set global enable_server_side_prepared_statement = false" } diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy b/regression-test/suites/point_query_p0/test_point_query.groovy index c1a785612f..68df19e662 100644 --- a/regression-test/suites/point_query_p0/test_point_query.groovy +++ b/regression-test/suites/point_query_p0/test_point_query.groovy @@ -30,8 +30,9 @@ suite("test_point_query", "nonConcurrent") { try { set_be_config.call("disable_storage_row_cache", "false") // nereids do not support point query now - sql """set global enable_nereids_planner=false""" - + sql "set global enable_fallback_to_original_planner = false" + sql """set global enable_nereids_planner=true""" + sql "set global enable_server_side_prepared_statement = true" def user = context.config.jdbcUser def password = context.config.jdbcPassword def realDb = "regression_test_serving_p0" @@ -138,7 +139,7 @@ suite("test_point_query", "nonConcurrent") { sql """ INSERT INTO ${tableName} VALUES(298, 120939.11130, "${generateString(298)}", "laooq", "2030-01-02", "2020-01-01 12:36:38", 298, "7022-01-01 11:30:38", 1, 90696620686827832.374, [], []) """ def result1 = connect(user=user, password=password, url=prepare_url) { - def stmt = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = ? and k2 = ? and k3 = ?" + def stmt = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = ? and k2 = ? and k3 = ?" assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); stmt.setInt(1, 1231) stmt.setBigDecimal(2, new BigDecimal("119291.11")) @@ -174,13 +175,14 @@ suite("test_point_query", "nonConcurrent") { qe_point_select stmt stmt.close() - stmt = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = 1235 and k2 = ? and k3 = ?" + stmt = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = ? and k2 = ? and k3 = ?" assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); - stmt.setBigDecimal(1, new BigDecimal("991129292901.11138")) - stmt.setString(2, "dd") + stmt.setInt(1, 1235) + stmt.setBigDecimal(2, new BigDecimal("991129292901.11138")) + stmt.setString(3, "dd") qe_point_select stmt - def stmt_fn = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=false) */ hex(k3), hex(k4) from ${tableName} where k1 = ? and k2 =? and k3 = ?" + def stmt_fn = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) from ${tableName} where k1 = ? and k2 =? and k3 = ?" assertEquals(stmt_fn.class, com.mysql.cj.jdbc.ServerPreparedStatement); stmt_fn.setInt(1, 1231) stmt_fn.setBigDecimal(2, new BigDecimal("119291.11")) @@ -194,8 +196,9 @@ suite("test_point_query", "nonConcurrent") { """ sleep(1); nprep_sql """ INSERT INTO ${tableName} VALUES(1235, 120939.11130, "a ddd", "laooq", "2030-01-02", "2020-01-01 12:36:38", 22.822, "7022-01-01 11:30:38", 1, 1.1111299, [119291.19291], ["111", "222", "333"], 1) """ - stmt.setBigDecimal(1, new BigDecimal("120939.11130")) - stmt.setString(2, "a ddd") + stmt.setBigDecimal(1, 1235) + stmt.setBigDecimal(2, new BigDecimal("120939.11130")) + stmt.setString(3, "a ddd") qe_point_select stmt qe_point_select stmt // invalidate cache @@ -222,9 +225,9 @@ suite("test_point_query", "nonConcurrent") { } // disable useServerPrepStmts def result2 = connect(user=user, password=password, url=context.config.jdbcUrl) { - qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'""" - qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" - qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */ hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" + qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'""" + qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" + qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'""" // prepared text // sql """ prepare stmt1 from select * from ${tableName} where k1 = % and k2 = % and k3 = % """ // qt_sql """execute stmt1 using (1231, 119291.11, 'ddd')""" @@ -253,7 +256,7 @@ suite("test_point_query", "nonConcurrent") { "disable_auto_compaction" = "false" );""" sql """insert into ${tableName} values (0, "1", "2", "3")""" - qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where customer_key = 0""" + qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where customer_key = 0""" } } sql "DROP TABLE IF EXISTS test_ODS_EBA_LLREPORT"; @@ -271,9 +274,11 @@ suite("test_point_query", "nonConcurrent") { ); """ sql "insert into test_ODS_EBA_LLREPORT(RPTNO) values('567890')" - sql "select /*+ SET_VAR(enable_nereids_planner=false) */ substr(RPTNO,2,5) from test_ODS_EBA_LLREPORT where RPTNO = '567890'" + sql "select /*+ SET_VAR(enable_nereids_planner=true) */ substr(RPTNO,2,5) from test_ODS_EBA_LLREPORT where RPTNO = '567890'" } finally { set_be_config.call("disable_storage_row_cache", "true") sql """set global enable_nereids_planner=true""" + sql "set global enable_fallback_to_original_planner = true" + sql "set global enable_server_side_prepared_statement = false" } } \ No newline at end of file diff --git a/regression-test/suites/point_query_p0/test_point_query_partition.groovy b/regression-test/suites/point_query_p0/test_point_query_partition.groovy index 7b5966db0c..459911f25c 100644 --- a/regression-test/suites/point_query_p0/test_point_query_partition.groovy +++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy @@ -17,13 +17,13 @@ import java.math.BigDecimal; -suite("test_point_query_partition") { +suite("test_point_query_partition", "nonConcurrent") { def user = context.config.jdbcUser def password = context.config.jdbcPassword def realDb = "regression_test_serving_p0" def tableName = realDb + ".tbl_point_query_partition" sql "CREATE DATABASE IF NOT EXISTS ${realDb}" - + sql "set global enable_server_side_prepared_statement = true" // Parse url String jdbcUrl = context.config.jdbcUrl String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) @@ -114,4 +114,41 @@ suite("test_point_query_partition") { stmt.setInt(1, 1000) qe_point_select stmt } + + sql "DROP TABLE IF EXISTS regression_test_serving_p0.customer"; + sql """ + CREATE TABLE regression_test_serving_p0.customer ( + `customer_key` BIGINT NULL, + `customer_value_0` TEXT NULL, + `customer_value_1` TEXT NULL, + `customer_value_2` TEXT NULL, + `customer_value_3` TEXT NULL, + `customer_value_4` TEXT NULL, + `customer_value_5` TEXT NULL, + `customer_value_6` TEXT NULL, + `customer_value_7` TEXT NULL, + `customer_value_8` TEXT NULL, + `customer_value_10` TEXT NULL + ) ENGINE=OLAP + UNIQUE KEY(`customer_key`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`customer_key`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "store_row_column" = "true" + ); + """ + sql """insert into regression_test_serving_p0.customer(customer_key, customer_value_0, customer_value_1) values(686612, "686612", "686612")""" + sql """insert into regression_test_serving_p0.customer(customer_key, customer_value_0, customer_value_1) values(686613, "686613", "686613")""" + def result3 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "select /*+ SET_VAR(enable_nereids_planner=true) */ * from regression_test_serving_p0.customer where customer_key = ?" + stmt.setInt(1, 686612) + qe_point_selectxxx stmt + qe_point_selectyyy stmt + qe_point_selectzzz stmt + stmt.setInt(1, 686613) + qe_point_selectmmm stmt + qe_point_selecteee stmt + } + sql "set global enable_server_side_prepared_statement = false" } \ No newline at end of file