From be0a0200cf4888c76b9a09dcd206f471e9c49f27 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 14 Sep 2022 22:30:45 +0800 Subject: [PATCH] [fix](grpc-java) use pooled stub to call rpc on be instead of one stub (#10439) A channel is closed when a timeout or exception happens, if only one stub is used, then all query would fail. If we dont close the channel, sometimes grpc-java stuck without sending any rpc. --- .../doris/rpc/BackendServiceClient.java | 3 ++- .../apache/doris/rpc/BackendServiceProxy.java | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 0269bee35d..d0d3e4c19f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; public class BackendServiceClient { public static final Logger LOG = LogManager.getLogger(BackendServiceClient.class); - private static final int MAX_RETRY_NUM = 0; + private static final int MAX_RETRY_NUM = 10; private final TNetworkAddress address; private final PBackendServiceGrpc.PBackendServiceFutureStub stub; private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub; @@ -53,6 +53,7 @@ public class BackendServiceClient { this.address = address; channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort()) .flowControlWindow(Config.grpc_max_message_size_bytes) + .keepAliveWithoutCalls(true) .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM) .intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build(); stub = PBackendServiceGrpc.newFutureStub(channel); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 96206416d6..7316782251 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -37,6 +37,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public class BackendServiceProxy { @@ -50,12 +51,24 @@ public class BackendServiceProxy { serviceMap = Maps.newConcurrentMap(); } - private static class SingletonHolder { - private static final BackendServiceProxy INSTANCE = new BackendServiceProxy(); + private static class Holder { + private static final int PROXY_NUM = 20; + private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM]; + private static AtomicInteger count = new AtomicInteger(); + + static { + for (int i = 0; i < proxies.length; i++) { + proxies[i] = new BackendServiceProxy(); + } + } + + static BackendServiceProxy get() { + return proxies[count.addAndGet(1) % PROXY_NUM]; + } } public static BackendServiceProxy getInstance() { - return SingletonHolder.INSTANCE; + return Holder.get(); } public void removeProxy(TNetworkAddress address) {