[fix](rpc) Rebuild failed channel to avoid connection refused (#25688)
This commit is contained in:
@ -27,6 +27,7 @@ import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.ForwardingClientCall;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
@ -64,6 +65,14 @@ public class BackendServiceClient {
|
||||
execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000;
|
||||
}
|
||||
|
||||
// Is the underlying channel in a normal state? (That means the RPC call will not fail immediately)
|
||||
public boolean isNormalState() {
|
||||
ConnectivityState state = channel.getState(false);
|
||||
return state == ConnectivityState.CONNECTING
|
||||
|| state == ConnectivityState.IDLE
|
||||
|| state == ConnectivityState.READY;
|
||||
}
|
||||
|
||||
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
|
||||
InternalService.PExecPlanFragmentRequest request) {
|
||||
return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS)
|
||||
|
||||
@ -111,18 +111,30 @@ public class BackendServiceProxy {
|
||||
private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException {
|
||||
String realIp = NetUtils.getIpByHost(address.getHostname());
|
||||
BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address);
|
||||
if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp)) {
|
||||
if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp)
|
||||
&& serviceClientExtIp.client.isNormalState()) {
|
||||
return serviceClientExtIp.client;
|
||||
}
|
||||
|
||||
// not exist, create one and return.
|
||||
BackendServiceClient removedClient = null;
|
||||
lock.lock();
|
||||
try {
|
||||
serviceClientExtIp = serviceMap.get(address);
|
||||
if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) {
|
||||
LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp);
|
||||
serviceMap.remove(address);
|
||||
removedClient = serviceClientExtIp.client;
|
||||
serviceClientExtIp = null;
|
||||
}
|
||||
if (serviceClientExtIp != null && !serviceClientExtIp.client.isNormalState()) {
|
||||
// At this point we cannot judge the progress of reconnecting the underlying channel.
|
||||
// In the worst case, it may take two minutes. But we can't stand the connection refused
|
||||
// for two minutes, so rebuild the channel directly.
|
||||
serviceMap.remove(address);
|
||||
removedClient = serviceClientExtIp.client;
|
||||
serviceClientExtIp = null;
|
||||
}
|
||||
serviceClientExtIp = serviceMap.get(address);
|
||||
if (serviceClientExtIp == null) {
|
||||
BackendServiceClient client = new BackendServiceClient(address, grpcThreadPool);
|
||||
serviceMap.put(address, new BackendServiceClientExtIp(realIp, client));
|
||||
@ -130,6 +142,9 @@ public class BackendServiceProxy {
|
||||
return serviceMap.get(address).client;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
if (removedClient != null) {
|
||||
removedClient.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user