[fix](executor) Fe publish topic info tcp leak (#29739)
* [fix](executor) Fe publish topic info tcp leak * enhancement
This commit is contained in:
@ -103,16 +103,29 @@ public class TopicPublisherThread extends MasterDaemon {
|
||||
@Override
|
||||
public void run() {
|
||||
long beginTime = System.currentTimeMillis();
|
||||
BackendService.Client client = null;
|
||||
TNetworkAddress address = null;
|
||||
boolean ok = false;
|
||||
try {
|
||||
TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort());
|
||||
BackendService.Client client = ClientPool.backendPool.borrowObject(addr);
|
||||
address = new TNetworkAddress(be.getHost(), be.getBePort());
|
||||
client = ClientPool.backendPool.borrowObject(address);
|
||||
client.publishTopicInfo(request);
|
||||
ok = true;
|
||||
LOG.info("publish topic info to be {} success, time cost={} ms",
|
||||
be.getHost(), (System.currentTimeMillis() - beginTime));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("publish topic info to be {} error happens: , time cost={} ms",
|
||||
be.getHost(), (System.currentTimeMillis() - beginTime), e);
|
||||
} finally {
|
||||
try {
|
||||
if (ok) {
|
||||
ClientPool.backendPool.returnObject(address, client);
|
||||
} else {
|
||||
ClientPool.backendPool.invalidateObject(address, client);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("recycle topic publish client failed. related backend[{}]", be.getHost(), e);
|
||||
}
|
||||
handler.onResponse(be);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user