From b26d8f284c559f1ed024d3bfbc52f8a06ea08d19 Mon Sep 17 00:00:00 2001 From: ZenoYang Date: Wed, 2 Nov 2022 19:39:33 +0800 Subject: [PATCH] [fix](rpc) The proxy removed when rpc exception occurs is not an abnormal proxy (#13836) `BackendServiceProxy.getInstance()` uses the round robin strategy to obtain the proxy, so when the current RPC request is abnormal, the proxy removed by `BackendServiceProxy.getInstance().removeProxy(...)` is not an abnormal proxy. --- .../java/org/apache/doris/qe/Coordinator.java | 50 +++++++++++-------- .../apache/doris/rpc/BackendServiceProxy.java | 2 +- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8b23940ff4..c066c174d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -109,6 +109,8 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -645,8 +647,8 @@ public class Coordinator { } // end for fragments // 4. send and wait fragments rpc - List>> futures = - Lists.newArrayList(); + List>> + futures = Lists.newArrayList(); Context parentSpanContext = Context.current(); for (BackendExecStates states : beToExecStates.values()) { Span span = Telemetry.getNoopSpan(); @@ -656,7 +658,8 @@ public class Coordinator { } states.scopedSpan = new ScopedSpan(span); states.unsetFields(); - futures.add(Pair.of(states, states.execRemoteFragmentsAsync())); + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy))); } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); @@ -670,7 +673,8 @@ public class Coordinator { .setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan(); } states.scopedSpan = new ScopedSpan(span); - futures.add(Pair.of(states, states.execPlanFragmentStartAsync())); + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execPlanFragmentStartAsync(proxy))); } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); } @@ -681,7 +685,8 @@ public class Coordinator { } } - private void waitRpc(List>> futures, long leftTimeMs, + private void waitRpc(List>> futures, + long leftTimeMs, String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( @@ -689,25 +694,25 @@ public class Coordinator { } long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); - for (Pair> pair : futures) { + for (Triple> triple : futures) { TStatusCode code; String errMsg = null; Exception exception = null; - Span span = pair.first.scopedSpan.getSpan(); + Span span = triple.getLeft().scopedSpan.getSpan(); try { - PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS); + PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS); code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { if (!result.getStatus().getErrorMsgsList().isEmpty()) { errMsg = result.getStatus().getErrorMsgsList().get(0); } else { - errMsg = operation + " failed. backend id: " + pair.first.beId; + errMsg = operation + " failed. backend id: " + triple.getLeft().beId; } } } catch (ExecutionException e) { exception = e; code = TStatusCode.THRIFT_RPC_ERROR; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr); + triple.getMiddle().removeProxy(triple.getLeft().brpcAddr); } catch (InterruptedException e) { exception = e; code = TStatusCode.INTERNAL_ERROR; @@ -726,12 +731,14 @@ public class Coordinator { cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L); - throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) + .increase(1L); + throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); case THRIFT_RPC_ERROR: - MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L); - SimpleScheduler.addToBlacklist(pair.first.beId, errMsg); - throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) + .increase(1L); + SimpleScheduler.addToBlacklist(triple.getLeft().beId, errMsg); + throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); default: throw new UserException(errMsg, exception); } @@ -740,7 +747,7 @@ public class Coordinator { span.recordException(e); throw e; } finally { - pair.first.scopedSpan.endSpan(); + triple.getLeft().scopedSpan.endSpan(); } } } @@ -2208,15 +2215,15 @@ public class Coordinator { } } - public Future execRemoteFragmentsAsync() throws TException { + public Future execRemoteFragmentsAsync(BackendServiceProxy proxy) + throws TException { try { TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); for (BackendExecState state : states) { state.initiated = true; paramsList.addToParamsList(state.rpcParams); } - return BackendServiceProxy.getInstance() - .execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); + return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. @@ -2224,12 +2231,13 @@ public class Coordinator { } } - public Future execPlanFragmentStartAsync() throws TException { + public Future execPlanFragmentStartAsync(BackendServiceProxy proxy) + throws TException { try { PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder(); PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); builder.setQueryId(qid); - return BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, builder.build()); + return proxy.execPlanFragmentStartAsync(brpcAddr, builder.build()); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index c58defa488..466a7b1525 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -64,7 +64,7 @@ public class BackendServiceProxy { } static BackendServiceProxy get() { - return proxies[count.addAndGet(1) % PROXY_NUM]; + return proxies[Math.abs(count.addAndGet(1) % PROXY_NUM)]; } }