From ff7f09fe1f8025114a8d1e2962eacca3e535a9a3 Mon Sep 17 00:00:00 2001 From: HB Date: Tue, 9 Jan 2024 23:31:20 +0800 Subject: [PATCH] [fix](executor) Fe publish topic info tcp leak (#29739) * [fix](executor) Fe publish topic info tcp leak * enhancement --- .../common/publish/TopicPublisherThread.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java index 0a6bed76d0..9af6bd392c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -103,16 +103,29 @@ public class TopicPublisherThread extends MasterDaemon { @Override public void run() { long beginTime = System.currentTimeMillis(); + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; try { - TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort()); - BackendService.Client client = ClientPool.backendPool.borrowObject(addr); + address = new TNetworkAddress(be.getHost(), be.getBePort()); + client = ClientPool.backendPool.borrowObject(address); client.publishTopicInfo(request); + ok = true; LOG.info("publish topic info to be {} success, time cost={} ms", be.getHost(), (System.currentTimeMillis() - beginTime)); } catch (Exception e) { LOG.warn("publish topic info to be {} error happens: , time cost={} ms", be.getHost(), (System.currentTimeMillis() - beginTime), e); } finally { + try { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } catch (Throwable e) { + LOG.warn("recycle topic publish client failed. related backend[{}]", be.getHost(), e); + } handler.onResponse(be); } }