[fix](grpc-java) use pooled stub to call rpc on be instead of one stub (#10439)

A channel is closed when a timeout or exception happens, if only
one stub is used, then all query would fail.

If we dont close the channel, sometimes grpc-java stuck without sending
any rpc.
This commit is contained in:
Yongqiang YANG
2022-09-14 22:30:45 +08:00
committed by GitHub
parent 0ead048b93
commit be0a0200cf
2 changed files with 18 additions and 4 deletions

View File

@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public class BackendServiceClient {
public static final Logger LOG = LogManager.getLogger(BackendServiceClient.class);
private static final int MAX_RETRY_NUM = 0;
private static final int MAX_RETRY_NUM = 10;
private final TNetworkAddress address;
private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
@ -53,6 +53,7 @@ public class BackendServiceClient {
this.address = address;
channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
.flowControlWindow(Config.grpc_max_message_size_bytes)
.keepAliveWithoutCalls(true)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
.intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build();
stub = PBackendServiceGrpc.newFutureStub(channel);

View File

@ -37,6 +37,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class BackendServiceProxy {
@ -50,12 +51,24 @@ public class BackendServiceProxy {
serviceMap = Maps.newConcurrentMap();
}
private static class SingletonHolder {
private static final BackendServiceProxy INSTANCE = new BackendServiceProxy();
private static class Holder {
private static final int PROXY_NUM = 20;
private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM];
private static AtomicInteger count = new AtomicInteger();
static {
for (int i = 0; i < proxies.length; i++) {
proxies[i] = new BackendServiceProxy();
}
}
static BackendServiceProxy get() {
return proxies[count.addAndGet(1) % PROXY_NUM];
}
}
public static BackendServiceProxy getInstance() {
return SingletonHolder.INSTANCE;
return Holder.get();
}
public void removeProxy(TNetworkAddress address) {