From b4e82d23229e8ba0073f09543dbab249d4d0c529 Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 25 Jul 2024 22:11:23 +0800 Subject: [PATCH] =?UTF-8?q?[Improvement](rpc)=20set=20grpc=20channel's=20k?= =?UTF-8?q?eepAliveTime=20and=20remove=20proxy=20=E2=80=A6=20(#38381)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …on InterruptedExcep… (#37304) ## Proposed changes 1. set grpc channel's keepAliveTime 2. remove proxy on InterruptedException/TimeoutException to avoid channel unavailable pick from #37304 ## Proposed changes Issue Number: close #xxx --- .../src/main/java/org/apache/doris/common/Config.java | 8 ++++++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 2 ++ .../java/org/apache/doris/rpc/BackendServiceClient.java | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 441a6a7c46..5803468e8b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1464,6 +1464,14 @@ public class Config extends ConfigBase { @ConfField public static int grpc_threadmgr_threads_nums = 4096; + /** + * sets the time without read activity before sending a keepalive ping + * the smaller the value, the sooner the channel is unavailable, but it will increase network io + */ + @ConfField(description = { "设置grpc连接发送 keepalive ping 之前没有数据传输的时间。", + "The time without grpc read activity before sending a keepalive ping" }) + public static int grpc_keep_alive_second = 10; + /** * Used to set minimal number of replication per tablet. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a4c2678658..9e22f853c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1219,6 +1219,7 @@ public class Coordinator implements CoordInterface { } catch (InterruptedException e) { exception = e; code = TStatusCode.INTERNAL_ERROR; + triple.getMiddle().removeProxy(triple.getLeft().brpcAddr); } catch (TimeoutException e) { exception = e; errMsg = String.format( @@ -1226,6 +1227,7 @@ public class Coordinator implements CoordInterface { operation, queryOptions.getExecutionTimeout(), timeoutMs / 1000); LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg); code = TStatusCode.TIMEOUT; + triple.getMiddle().removeProxy(triple.getLeft().brpcAddr); } if (code != TStatusCode.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 3b7780591d..924955e662 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -46,7 +46,7 @@ public class BackendServiceClient { public BackendServiceClient(TNetworkAddress address, Executor executor) { this.address = address; channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort()) - .executor(executor) + .executor(executor).keepAliveTime(Config.grpc_keep_alive_second, TimeUnit.SECONDS) .flowControlWindow(Config.grpc_max_message_size_bytes) .keepAliveWithoutCalls(true) .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)