[Feature](Point Query) fully support in nereids #35823 (#36205)

This commit is contained in:
lihangyu
2024-06-13 08:37:31 +08:00
committed by GitHub
parent 3a3c8cd9ee
commit 226775f059
19 changed files with 684 additions and 30 deletions

View File

@ -107,6 +107,7 @@ public class NereidsPlanner extends Planner {
@Override
public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions queryOptions) {
this.queryOptions = queryOptions;
if (statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
NereidsTracer.init();
} else {

View File

@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
@ -122,8 +123,8 @@ public class StatementContext implements Closeable {
// generate for next id for prepared statement's placeholders, which is connection level
private final IdGenerator<PlaceholderId> placeHolderIdGenerator = PlaceholderId.createGenerator();
// relation id to placeholders for prepared statement
private final Map<PlaceholderId, Expression> idToPlaceholderRealExpr = new HashMap<>();
// relation id to placeholders for prepared statement, ordered by placeholder id
private final Map<PlaceholderId, Expression> idToPlaceholderRealExpr = new TreeMap<>();
// collect all hash join conditions to compute node connectivity in join graph
private final List<Expression> joinFilters = new ArrayList<>();
@ -164,6 +165,12 @@ public class StatementContext implements Closeable {
// form this map
private final Map<RelationId, Statistics> relationIdToStatisticsMap = new LinkedHashMap<>();
// Indicates the query is short-circuited in both plan and execution phase, typically
// for high speed/concurrency point queries
private boolean isShortCircuitQuery;
private ShortCircuitQueryContext shortCircuitQueryContext;
public StatementContext() {
this(ConnectContext.get(), null, 0);
}
@ -235,6 +242,22 @@ public class StatementContext implements Closeable {
}
}
public boolean isShortCircuitQuery() {
return isShortCircuitQuery;
}
public void setShortCircuitQuery(boolean shortCircuitQuery) {
isShortCircuitQuery = shortCircuitQuery;
}
public ShortCircuitQueryContext getShortCircuitQueryContext() {
return shortCircuitQueryContext;
}
public void setShortCircuitQueryContext(ShortCircuitQueryContext shortCircuitQueryContext) {
this.shortCircuitQueryContext = shortCircuitQueryContext;
}
public Optional<SqlCacheContext> getSqlCacheContext() {
return Optional.ofNullable(sqlCacheContext);
}

View File

@ -82,6 +82,7 @@ import org.apache.doris.nereids.rules.rewrite.InferPredicates;
import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct;
import org.apache.doris.nereids.rules.rewrite.InlineLogicalView;
import org.apache.doris.nereids.rules.rewrite.LimitSortToTopN;
import org.apache.doris.nereids.rules.rewrite.LogicalResultSinkToShortCircuitPointQuery;
import org.apache.doris.nereids.rules.rewrite.MergeAggregate;
import org.apache.doris.nereids.rules.rewrite.MergeFilters;
import org.apache.doris.nereids.rules.rewrite.MergeOneRowRelationIntoUnion;
@ -398,6 +399,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
topic("Point query short circuit",
topDown(new LogicalResultSinkToShortCircuitPointQuery())),
topic("eliminate",
// SORT_PRUNING should be applied after mergeLimit
custom(RuleType.ELIMINATE_SORT, EliminateSort::new),

View File

@ -340,6 +340,8 @@ public enum RuleType {
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_PROJECT_SCAN(RuleTypeClass.REWRITE),
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_FILTER_SCAN(RuleTypeClass.REWRITE),
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_AGG_SCAN(RuleTypeClass.REWRITE),
// short circuit rule
SHOR_CIRCUIT_POINT_QUERY(RuleTypeClass.REWRITE),
// exploration rules
REORDER_INTERSECT(RuleTypeClass.EXPLORATION),
TEST_EXPLORATION(RuleTypeClass.EXPLORATION),

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
/**
* short circuit query optimization
* pattern : select xxx from tbl where key = ?
*/
public class LogicalResultSinkToShortCircuitPointQuery implements RewriteRuleFactory {
private Expression removeCast(Expression expression) {
if (expression instanceof Cast) {
return expression.child(0);
}
return expression;
}
private boolean filterMatchShortCircuitCondition(LogicalFilter<LogicalOlapScan> filter) {
return filter.getConjuncts().stream().allMatch(
// all conjuncts match with pattern `key = ?`
expression -> (expression instanceof EqualTo)
&& (removeCast(expression.child(0)).isKeyColumnFromTable()
|| ((SlotReference) expression.child(0)).getName().equals(Column.DELETE_SIGN))
&& expression.child(1).isLiteral());
}
private boolean scanMatchShortCircuitCondition(LogicalOlapScan olapScan) {
if (!ConnectContext.get().getSessionVariable().enableShortCircuitQuery) {
return false;
}
OlapTable olapTable = olapScan.getTable();
return olapTable.getEnableLightSchemaChange() && olapTable.getEnableUniqueKeyMergeOnWrite()
&& olapTable.storeRowColumn();
}
// set short circuit flag and return the original plan
private Plan shortCircuit(Plan root, OlapTable olapTable,
Set<Expression> conjuncts, StatementContext statementContext) {
// All key columns in conjuncts
Set<String> colNames = Sets.newHashSet();
for (Expression expr : conjuncts) {
colNames.add(((SlotReference) removeCast((expr.child(0)))).getName());
}
// set short circuit flag and modify nothing to the plan
if (olapTable.getBaseSchemaKeyColumns().size() <= colNames.size()) {
statementContext.setShortCircuitQuery(true);
}
return root;
}
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.SHOR_CIRCUIT_POINT_QUERY.build(
logicalResultSink(logicalProject(logicalFilter(logicalOlapScan()
.when(this::scanMatchShortCircuitCondition)
).when(this::filterMatchShortCircuitCondition)))
.thenApply(ctx -> {
return shortCircuit(ctx.root, ctx.root.child().child().child().getTable(),
ctx.root.child().child().getConjuncts(), ctx.statementContext);
})),
RuleType.SHOR_CIRCUIT_POINT_QUERY.build(
logicalResultSink(logicalFilter(logicalOlapScan()
.when(this::scanMatchShortCircuitCondition)
).when(this::filterMatchShortCircuitCondition))
.thenApply(ctx -> {
return shortCircuit(ctx.root, ctx.root.child().child().getTable(),
ctx.root.child().getConjuncts(), ctx.statementContext);
}))
);
}
}

View File

@ -390,6 +390,11 @@ public abstract class Expression extends AbstractTreeNode<Expression> implements
return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent();
}
public boolean isKeyColumnFromTable() {
return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent()
&& ((SlotReference) this).getColumn().get().isKey();
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -539,7 +539,7 @@ public abstract class Literal extends Expression implements LeafExpression, Comp
microsecond = data.getInt();
}
if (Config.enable_date_conversion) {
return new DateTimeV2Literal(year, month, day, hour, minute, second, microsecond);
return new DateTimeV2Literal(DateTimeV2Type.MAX, year, month, day, hour, minute, second, microsecond);
}
return new DateTimeLiteral(DateTimeType.INSTANCE, year, month, day, hour, minute, second, microsecond);
} else {

View File

@ -23,7 +23,7 @@ import org.apache.doris.common.IdGenerator;
/**
* placeholder id for prepared statement parameters
*/
public class PlaceholderId extends Id<PlaceholderId> {
public class PlaceholderId extends Id<PlaceholderId> implements Comparable<PlaceholderId> {
public PlaceholderId(int id) {
super(id);
@ -55,4 +55,9 @@ public class PlaceholderId extends Id<PlaceholderId> {
public int hashCode() {
return super.hashCode();
}
@Override
public int compareTo(PlaceholderId o) {
return this.id - o.id;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@ -24,10 +25,13 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.PointQueryExecutor;
import org.apache.doris.qe.PreparedStatementContext;
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.StmtExecutor;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -65,8 +69,26 @@ public class ExecuteCommand extends Command {
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(prepareCommand.getLogicalPlan(), executor.getContext()
.getStatementContext());
executor.setParsedStmt(planAdapter);
// execute real statement
executor.execute();
// If it's not a short circuit query or schema version is different(indicates schema changed),
// need to do reanalyze and plan
boolean needAnalyze = !executor.getContext().getStatementContext().isShortCircuitQuery()
|| (preparedStmtCtx.shortCircuitQueryContext.isPresent()
&& preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion()
!= preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion);
if (needAnalyze) {
// execute real statement
preparedStmtCtx.shortCircuitQueryContext = Optional.empty();
statementContext.setShortCircuitQueryContext(null);
executor.execute();
if (executor.getContext().getStatementContext().isShortCircuitQuery()) {
// cache short-circuit plan
preparedStmtCtx.shortCircuitQueryContext = Optional.of(
new ShortCircuitQueryContext(executor.planner(), (Queriable) executor.getParsedStmt()));
statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get());
}
return;
}
PointQueryExecutor.directExecuteShortCircuitQuery(executor, preparedStmtCtx, statementContext);
}
/**

View File

@ -1122,7 +1122,8 @@ public class OlapScanNode extends ScanNode {
public boolean isPointQuery() {
return this.pointQueryEqualPredicats != null
|| (preparedStatment != null && preparedStatment.isPointQueryShortCircuit());
|| (preparedStatment != null && preparedStatment.isPointQueryShortCircuit())
|| ConnectContext.get().getStatementContext().isShortCircuitQuery();
}
private void computeTabletInfo() throws UserException {
@ -1250,6 +1251,7 @@ public class OlapScanNode extends ScanNode {
scanTabletIds.clear();
bucketSeq2locations.clear();
scanReplicaIds.clear();
sampleTabletIds.clear();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
@ -1350,7 +1352,7 @@ public class OlapScanNode extends ScanNode {
output.append(prefix).append("pushAggOp=").append(pushDownAggNoGroupingOp).append("\n");
}
if (isPointQuery()) {
output.append(prefix).append("SHORT-CIRCUIT");
output.append(prefix).append("SHORT-CIRCUIT\n");
}
if (!CollectionUtils.isEmpty(rewrittenProjectList)) {

View File

@ -0,0 +1,319 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.KeyTuple;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class PointQueryExecutor implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(PointQueryExecutor.class);
private long tabletID = 0;
private long timeoutMs = Config.point_query_timeout_ms; // default 10s
private boolean isCancel = false;
private List<Backend> candidateBackends;
private final int maxMsgSizeOfResultReceiver;
// used for snapshot read in cloud mode
private List<Long> snapshotVisibleVersions;
private final ShortCircuitQueryContext shortCircuitQueryContext;
public PointQueryExecutor(ShortCircuitQueryContext ctx, int maxMessageSize) {
ctx.sanitize();
this.shortCircuitQueryContext = ctx;
this.maxMsgSizeOfResultReceiver = maxMessageSize;
}
void setScanRangeLocations() throws Exception {
OlapScanNode scanNode = shortCircuitQueryContext.scanNode;
// compute scan range
List<TScanRangeLocations> locations = scanNode.lazyEvaluateRangeLocations();
Preconditions.checkNotNull(locations);
if (scanNode.getScanTabletIds().isEmpty()) {
return;
}
Preconditions.checkState(scanNode.getScanTabletIds().size() == 1);
this.tabletID = scanNode.getScanTabletIds().get(0);
candidateBackends = new ArrayList<>();
for (Long backendID : scanNode.getScanBackendIds()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendID);
if (SimpleScheduler.isAvailable(backend)) {
candidateBackends.add(backend);
}
}
// Random read replicas
Collections.shuffle(this.candidateBackends);
if (LOG.isDebugEnabled()) {
LOG.debug("set scan locations, backend ids {}, tablet id {}", candidateBackends, tabletID);
}
}
// execute query without analyze & plan
public static void directExecuteShortCircuitQuery(StmtExecutor executor,
PreparedStatementContext preparedStmtCtx,
StatementContext statementContext) throws Exception {
Preconditions.checkNotNull(preparedStmtCtx.shortCircuitQueryContext);
ShortCircuitQueryContext shortCircuitQueryContext = preparedStmtCtx.shortCircuitQueryContext.get();
// update conjuncts
List<Expr> conjunctVals = statementContext.getIdToPlaceholderRealExpr().values().stream().map(
expression -> (
(Literal) expression).toLegacyLiteral())
.collect(Collectors.toList());
if (conjunctVals.size() != preparedStmtCtx.command.placeholderCount()) {
throw new AnalysisException("Mismatched conjuncts values size with prepared"
+ "statement parameters size, expected "
+ preparedStmtCtx.command.placeholderCount()
+ ", but meet " + conjunctVals.size());
}
updateScanNodeConjuncts(shortCircuitQueryContext.scanNode, conjunctVals);
// short circuit plan and execution
executor.executeAndSendResult(false, false,
shortCircuitQueryContext.analzyedQuery, executor.getContext()
.getMysqlChannel(), null, null);
}
private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) {
for (int i = 0; i < conjunctVals.size(); ++i) {
BinaryPredicate binaryPredicate = (BinaryPredicate) scanNode.getConjuncts().get(i);
if (binaryPredicate.getChild(0) instanceof LiteralExpr) {
binaryPredicate.setChild(0, conjunctVals.get(i));
} else if (binaryPredicate.getChild(1) instanceof LiteralExpr) {
binaryPredicate.setChild(1, conjunctVals.get(i));
} else {
Preconditions.checkState(false, "Should conatains literal in " + binaryPredicate.toSqlImpl());
}
}
}
public void setTimeout(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
void addKeyTuples(
InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
// TODO handle IN predicates
KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) {
BinaryPredicate predicate = (BinaryPredicate) expr;
kBuilder.addKeyColumnRep(predicate.getChild(1).getStringValue());
}
requestBuilder.addKeyTuples(kBuilder);
}
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
// Do nothing
}
@Override
public RowBatch getNext() throws Exception {
setScanRangeLocations();
// No partition/tablet found return emtpy row batch
if (candidateBackends == null || candidateBackends.isEmpty()) {
return new RowBatch();
}
Iterator<Backend> backendIter = candidateBackends.iterator();
RowBatch rowBatch = null;
int tryCount = 0;
int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size());
Status status = new Status();
do {
Backend backend = backendIter.next();
rowBatch = getNextInternal(status, backend);
if (rowBatch != null) {
break;
}
if (++tryCount >= maxTry) {
break;
}
} while (true);
// handle status code
if (!status.ok()) {
if (Strings.isNullOrEmpty(status.getErrorMsg())) {
status.rewriteErrorMsg();
}
String errMsg = status.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
if (status.isRpcError()) {
throw new RpcException(null, errMsg);
} else {
// hide host info
int hostIndex = errMsg.indexOf("host");
if (hostIndex != -1) {
errMsg = errMsg.substring(0, hostIndex);
}
throw new UserException(errMsg);
}
}
return rowBatch;
}
@Override
public void exec() throws Exception {
// Point queries don't need to do anthing in execution phase.
// only handles in getNext()
}
private RowBatch getNextInternal(Status status, Backend backend) throws TException {
long timeoutTs = System.currentTimeMillis() + timeoutMs;
RowBatch rowBatch = new RowBatch();
InternalService.PTabletKeyLookupResponse pResult = null;
try {
Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable);
InternalService.PTabletKeyLookupRequest.Builder requestBuilder
= InternalService.PTabletKeyLookupRequest.newBuilder()
.setTabletId(tabletID)
.setDescTbl(shortCircuitQueryContext.serializedDescTable)
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
.setIsBinaryRow(ConnectContext.get().command == MysqlCommand.COM_STMT_EXECUTE);
if (snapshotVisibleVersions != null && !snapshotVisibleVersions.isEmpty()) {
requestBuilder.setVersion(snapshotVisibleVersions.get(0));
}
if (shortCircuitQueryContext.cacheID != null) {
InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder();
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
requestBuilder.setUuid(uuidBuilder);
}
addKeyTuples(requestBuilder);
InternalService.PTabletKeyLookupRequest request = requestBuilder.build();
Future<InternalService.PTabletKeyLookupResponse> futureResponse =
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request);
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request timeout");
return null;
}
try {
pResult = futureResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue to get result
LOG.warn("future get interrupted Exception");
if (isCancel) {
status.updateStatus(TStatusCode.CANCELLED, "cancelled");
return null;
}
} catch (TimeoutException e) {
futureResponse.cancel(true);
LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress());
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch result timeout");
return null;
}
} catch (RpcException e) {
LOG.warn("query fetch rpc exception {}, e {}", backend.getBrpcAddress(), e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
return null;
} catch (ExecutionException e) {
LOG.warn("query fetch execution exception {}, addr {}", e, backend.getBrpcAddress());
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
} else {
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
}
return null;
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg());
return null;
}
if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
LOG.debug("get empty rowbatch");
rowBatch.setEos(true);
status.updateStatus(TStatusCode.OK, "");
return rowBatch;
} else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
TDeserializer deserializer = new TDeserializer(
new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
try {
deserializer.deserialize(resultBatch, serialResult);
} catch (TException e) {
if (e.getMessage().contains("MaxMessageSize reached")) {
throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver");
} else {
throw e;
}
}
rowBatch.setBatch(resultBatch);
rowBatch.setEos(true);
status.updateStatus(TStatusCode.OK, "");
return rowBatch;
} else {
Preconditions.checkState(false, "No row batch or empty batch found");
}
if (isCancel) {
status.updateStatus(TStatusCode.CANCELLED, "cancelled");
}
return rowBatch;
}
public void cancel() {
isCancel = true;
}
}

View File

@ -20,11 +20,14 @@ package org.apache.doris.qe;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import java.util.Optional;
public class PreparedStatementContext {
public PrepareCommand command;
public ConnectContext ctx;
StatementContext statementContext;
public String stmtString;
public Optional<ShortCircuitQueryContext> shortCircuitQueryContext = Optional.empty();
// Timestamp in millisecond last command starts at
protected volatile long startTime;

View File

@ -129,6 +129,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String MAX_INSTANCE_NUM = "max_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_point_query";
public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge";
public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = "enable_server_side_prepared_statement";
@ -631,6 +632,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
@VariableMgr.VarAttr(name = ENABLE_SHORT_CIRCUIT_QUERY)
public boolean enableShortCircuitQuery = true;
@VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE)
public boolean enableExchangeNodeParallelMerge = false;

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprList;
import org.apache.doris.thrift.TQueryOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class ShortCircuitQueryContext {
// Cached for better CPU performance, since serialize DescriptorTable and
// outputExprs are heavy work
public final ByteString serializedDescTable;
public final ByteString serializedOutputExpr;
public final ByteString serializedQueryOptions;
// For prepared statement cached structure,
// there are some pre-calculated structure in Backend TabletFetch service
// using this ID to find for this prepared statement
public final UUID cacheID;
public final int schemaVersion;
public final OlapTable tbl;
public final OlapScanNode scanNode;
public final Queriable analzyedQuery;
// Serialized mysql Field, this could avoid serialize mysql field each time sendFields.
// Since, serialize fields is too heavy when table is wide
public Map<String, byte[]> serializedFields = Maps.newHashMap();
public ShortCircuitQueryContext(Planner planner, Queriable analzyedQuery) throws TException {
this.serializedDescTable = ByteString.copyFrom(
new TSerializer().serialize(planner.getDescTable().toThrift()));
TQueryOptions options = planner.getQueryOptions() != null ? planner.getQueryOptions() : new TQueryOptions();
this.serializedQueryOptions = ByteString.copyFrom(
new TSerializer().serialize(options));
List<TExpr> exprs = new ArrayList<>();
for (Expr expr : planner.getFragments().get(1).getPlanRoot().getProjectList()) {
exprs.add(expr.treeToThrift());
}
TExprList exprList = new TExprList(exprs);
serializedOutputExpr = ByteString.copyFrom(
new TSerializer().serialize(exprList));
this.cacheID = UUID.randomUUID();
this.scanNode = ((OlapScanNode) planner.getScanNodes().get(0));
this.tbl = this.scanNode.getOlapTable();
this.schemaVersion = this.tbl.getBaseSchemaVersion();
this.analzyedQuery = analzyedQuery;
}
public void sanitize() {
Preconditions.checkNotNull(serializedDescTable);
Preconditions.checkNotNull(serializedOutputExpr);
Preconditions.checkNotNull(cacheID);
Preconditions.checkNotNull(tbl);
}
}

View File

@ -1622,7 +1622,7 @@ public class StmtExecutor {
planner.plan(newSelectStmt, context.getSessionVariable().toThrift());
}
}
sendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer, cacheResult);
executeAndSendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer, cacheResult);
}
// Process a select statement.
@ -1704,11 +1704,12 @@ public class StmtExecutor {
}
}
sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
executeAndSendResult(isOutfileQuery, false, queryStmt, channel, null, null);
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
}
private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel,
public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
Queriable queryStmt, MysqlChannel channel,
CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception {
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
// We will not send real query result to client. Instead, we only send OK to client with
@ -1719,7 +1720,15 @@ public class StmtExecutor {
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
CoordInterface coordBase = null;
if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) {
if (statementContext.isShortCircuitQuery()) {
ShortCircuitQueryContext shortCircuitQueryContext =
statementContext.getShortCircuitQueryContext() != null
? statementContext.getShortCircuitQueryContext()
: new ShortCircuitQueryContext(planner, (Queriable) parsedStmt);
coordBase = new PointQueryExecutor(shortCircuitQueryContext,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) {
// this branch is for legacy planner, to be removed
coordBase = new PointQueryExec(planner, analyzer,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {