diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 45a6e3c481..866c0d8546 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -27,6 +27,7 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ConnectivityState; import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; import io.grpc.Metadata; @@ -64,6 +65,14 @@ public class BackendServiceClient { execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000; } + // Is the underlying channel in a normal state? (That means the RPC call will not fail immediately) + public boolean isNormalState() { + ConnectivityState state = channel.getState(false); + return state == ConnectivityState.CONNECTING + || state == ConnectivityState.IDLE + || state == ConnectivityState.READY; + } + public Future execPlanFragmentAsync( InternalService.PExecPlanFragmentRequest request) { return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 55881b4cf9..229391a3dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -111,18 +111,30 @@ public class BackendServiceProxy { private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException { String realIp = NetUtils.getIpByHost(address.getHostname()); BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); - if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp)) { + if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp) + && serviceClientExtIp.client.isNormalState()) { return serviceClientExtIp.client; } + // not exist, create one and return. + BackendServiceClient removedClient = null; lock.lock(); try { serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) { LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp); serviceMap.remove(address); + removedClient = serviceClientExtIp.client; + serviceClientExtIp = null; + } + if (serviceClientExtIp != null && !serviceClientExtIp.client.isNormalState()) { + // At this point we cannot judge the progress of reconnecting the underlying channel. + // In the worst case, it may take two minutes. But we can't stand the connection refused + // for two minutes, so rebuild the channel directly. + serviceMap.remove(address); + removedClient = serviceClientExtIp.client; + serviceClientExtIp = null; } - serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp == null) { BackendServiceClient client = new BackendServiceClient(address, grpcThreadPool); serviceMap.put(address, new BackendServiceClientExtIp(realIp, client)); @@ -130,6 +142,9 @@ public class BackendServiceProxy { return serviceMap.get(address).client; } finally { lock.unlock(); + if (removedClient != null) { + removedClient.shutdown(); + } } }