From de85c57715e63aa98d0c590f033117489ab72696 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Tue, 14 Feb 2023 07:31:31 +0800 Subject: [PATCH] [Improve](point query) support retry different backends in PointQueryExecutor (#16380) --- .../java/org/apache/doris/common/Config.java | 8 +++ .../java/org/apache/doris/qe/Coordinator.java | 5 +- .../org/apache/doris/qe/PointQueryExec.java | 68 ++++++++++++++----- .../java/org/apache/doris/system/Backend.java | 5 ++ 4 files changed, 65 insertions(+), 21 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 40ef72274e..1263c80593 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1065,6 +1065,14 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int max_query_retry_time = 1; + /** + * The number of point query retries in executor. + * A query may retry if we encounter RPC exception and no result has been sent to user. + * You may reduce this number to avoid Avalanche disaster. + */ + @ConfField(mutable = true) + public static int max_point_query_retry_time = 2; + /** * The tryLock timeout configuration of catalog lock. * Normally it does not need to change, unless you need to test something. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4d29d94333..b720538915 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -627,10 +627,7 @@ public class Coordinator { } else { OlapScanNode planRoot = (OlapScanNode) fragments.get(0).getPlanRoot(); Preconditions.checkState(planRoot.getScanTabletIds().size() == 1); - Long backendId = planRoot.getScanBackendIds().iterator().next(); - Backend backend = this.idToBackend.get(backendId); - TNetworkAddress execBeAddr = new TNetworkAddress(backend.getHost(), backend.getBePort()); - pointExec.setAddressAndBackendID(toBrpcHost(execBeAddr), backendId); + pointExec.setCandidateBackends(planRoot.getScanBackendIds()); pointExec.setTabletId(planRoot.getScanTabletIds().get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 919529e226..e62e849be0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -21,14 +21,16 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.Status; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.KeyTuple; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprList; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TStatusCode; @@ -40,6 +42,9 @@ import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -50,7 +55,6 @@ import java.util.concurrent.TimeoutException; public class PointQueryExec { private static final Logger LOG = LogManager.getLogger(PointQueryExec.class); - private TNetworkAddress address; // SlotRef sorted by column id private Map equalPredicats; // ByteString serialized for prepared statement @@ -63,22 +67,31 @@ public class PointQueryExec { private boolean isCancel = false; private boolean isBinaryProtocol = false; - private long backendID; + + private List candidateBackends; + // For parepared statement cached structure, // there are some pre caculated structure in Backend TabletFetch service // using this ID to find for this prepared statement private UUID cacheID; public PointQueryExec(Map equalPredicats, DescriptorTable descTable, - ArrayList outputExprs) { + ArrayList outputExprs) { this.equalPredicats = equalPredicats; this.descriptorTable = descTable; this.outputExprs = outputExprs; } - void setAddressAndBackendID(TNetworkAddress addr, long backendID) { - this.address = addr; - this.backendID = backendID; + void setCandidateBackends(HashSet backendsIds) { + candidateBackends = new ArrayList<>(); + for (Long backendID : backendsIds) { + Backend backend = Env.getCurrentSystemInfo().getBackend(backendID); + if (SimpleScheduler.isAvailable(backend)) { + candidateBackends.add(backend); + } + } + // Random read replicas + Collections.shuffle(this.candidateBackends); } public void setSerializedDescTable(ByteString serializedDescTable) { @@ -106,7 +119,7 @@ public class PointQueryExec { } void addKeyTuples( - InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { + InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { // TODO handle IN predicates KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); for (Expr expr : equalPredicats.values()) { @@ -117,6 +130,26 @@ public class PointQueryExec { } public RowBatch getNext(Status status) throws TException { + Iterator backendIter = candidateBackends.iterator(); + RowBatch rowBatch = null; + int tryCount = 0; + int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size()); + do { + Backend backend = backendIter.next(); + rowBatch = getNextInternal(status, backend); + ++tryCount; + if (rowBatch != null) { + break; + } + if (tryCount >= maxTry) { + break; + } + status.setStatus(Status.OK); + } while (true); + return rowBatch; + } + + private RowBatch getNextInternal(Status status, Backend backend) throws TException { long timeoutTs = System.currentTimeMillis() + timeoutMs; RowBatch rowBatch = new RowBatch(); InternalService.PTabletKeyLookupResponse pResult = null; @@ -148,13 +181,14 @@ public class PointQueryExec { requestBuilder.setUuid(uuidBuilder); } addKeyTuples(requestBuilder); - InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); - Future futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(address, request); + while (pResult == null) { + InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); + Future futureResponse = + BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request); long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { - LOG.warn("fetch result timeout {}", address); + LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); status.setStatus("query timeout"); return null; } @@ -170,22 +204,22 @@ public class PointQueryExec { } } } catch (RpcException e) { - LOG.warn("fetch result rpc exception {}", address); + LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress()); status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backendID, e.getMessage()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { - LOG.warn("fetch result execution exception {}", address); + LOG.warn("fetch result execution exception {}", backend.getBrpcAdress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); } else { status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backendID, e.getMessage()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } return null; } catch (TimeoutException e) { - LOG.warn("fetch result timeout {}", address); + LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); status.setStatus("query timeout"); return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b8b29c7e44..a78d1d6d32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -30,6 +30,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; @@ -782,6 +783,10 @@ public class Backend implements Writable { return tagMap; } + public TNetworkAddress getBrpcAdress() { + return new TNetworkAddress(getHost(), getBrpcPort()); + } + public String getTagMapString() { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; }