diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 0269bee35d..d0d3e4c19f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; public class BackendServiceClient { public static final Logger LOG = LogManager.getLogger(BackendServiceClient.class); - private static final int MAX_RETRY_NUM = 0; + private static final int MAX_RETRY_NUM = 10; private final TNetworkAddress address; private final PBackendServiceGrpc.PBackendServiceFutureStub stub; private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub; @@ -53,6 +53,7 @@ public class BackendServiceClient { this.address = address; channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort()) .flowControlWindow(Config.grpc_max_message_size_bytes) + .keepAliveWithoutCalls(true) .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM) .intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build(); stub = PBackendServiceGrpc.newFutureStub(channel); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 96206416d6..7316782251 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -37,6 +37,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public class BackendServiceProxy { @@ -50,12 +51,24 @@ public class BackendServiceProxy { serviceMap = Maps.newConcurrentMap(); } - private static class SingletonHolder { - private static final BackendServiceProxy INSTANCE = new BackendServiceProxy(); + private static class Holder { + private static final int PROXY_NUM = 20; + private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM]; + private static AtomicInteger count = new AtomicInteger(); + + static { + for (int i = 0; i < proxies.length; i++) { + proxies[i] = new BackendServiceProxy(); + } + } + + static BackendServiceProxy get() { + return proxies[count.addAndGet(1) % PROXY_NUM]; + } } public static BackendServiceProxy getInstance() { - return SingletonHolder.INSTANCE; + return Holder.get(); } public void removeProxy(TNetworkAddress address) {