From ac01da49847a5ad7e584fe583f467d2e91ef7bf9 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 10 Dec 2018 18:52:51 +0800 Subject: [PATCH] Clear client pool when heartbeat failed (#408) When heartbeat failed, we should clear the connections cached in client pool, or we will get broken connections from the pool. Since we don't have the REOPEN logic(which may cause ugly code style), a broken connection may cause a rpc blocked and failed. So clear them all and recreate them when needed is a simple way to resolve this problem. We only clear connections in backend and broker pool. No need to clear heartbeat pool because heartbeat is very frequent, such the connections can be invalid automatically. --- .../java/org/apache/doris/common/GenericPool.java | 12 ++++++++---- .../java/org/apache/doris/system/HeartbeatMgr.java | 14 +++++++++----- .../org/apache/doris/system/SystemInfoService.java | 14 -------------- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/GenericPool.java b/fe/src/main/java/org/apache/doris/common/GenericPool.java index 549a6dabb7..a38cc4fd41 100644 --- a/fe/src/main/java/org/apache/doris/common/GenericPool.java +++ b/fe/src/main/java/org/apache/doris/common/GenericPool.java @@ -17,8 +17,6 @@ package org.apache.doris.common; -import java.lang.reflect.Constructor; - import org.apache.doris.thrift.TNetworkAddress; import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; @@ -26,13 +24,15 @@ import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; + +import java.lang.reflect.Constructor; public class GenericPool { private static final Logger LOG = LogManager.getLogger(GenericPool.class); @@ -73,6 +73,10 @@ public class GenericPool { return ok; } + public void clearPool(TNetworkAddress addr) { + pool.clear(addr); + } + public boolean peak(VALUE object) { return object.getOutputProtocol().getTransport().peek(); } diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7c8d422636..ee3e84bcc7 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -26,7 +26,6 @@ import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; import org.apache.doris.http.rest.BootstrapFinishAction; import org.apache.doris.persist.HbPackage; -import org.apache.doris.system.BackendEvent.BackendEventType; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TBackendInfo; @@ -165,9 +164,9 @@ public class HeartbeatMgr extends Daemon { Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { boolean isChanged = be.handleHbResponse(hbResponse); - if (hbResponse.getStatus() != HbStatus.OK && !isReplay) { - nodeMgr.getEventBus().post(new BackendEvent(BackendEventType.BACKEND_DOWN, - "missing heartbeat", Long.valueOf(hbResponse.getBeId()))); + if (hbResponse.getStatus() != HbStatus.OK) { + // invalid all connections cached in ClientPool + ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); } return isChanged; } @@ -178,7 +177,12 @@ public class HeartbeatMgr extends Daemon { FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker( hbResponse.getName(), hbResponse.getHost(), hbResponse.getPort()); if (broker != null) { - return broker.handleHbResponse(hbResponse); + boolean isChanged = broker.handleHbResponse(hbResponse); + if (hbResponse.getStatus() != HbStatus.OK) { + // invalid all connections cached in ClientPool + ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port)); + } + return isChanged; } break; } diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index f8e9e36d25..1fb198e1de 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -26,7 +26,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend.BackendState; -import org.apache.doris.system.BackendEvent.BackendEventType; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -35,7 +34,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.eventbus.EventBus; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; @@ -65,8 +63,6 @@ public class SystemInfoService { private volatile AtomicReference> idToBackendRef; private volatile AtomicReference> idToReportVersionRef; - private final EventBus eventBus; - // last backend id used by round robin for sequential choosing backends for // tablet creation private ConcurrentHashMap lastBackendIdForCreationMap; @@ -94,16 +90,10 @@ public class SystemInfoService { idToReportVersionRef = new AtomicReference>( ImmutableMap. of()); - eventBus = new EventBus("backendEvent"); - lastBackendIdForCreationMap = new ConcurrentHashMap(); lastBackendIdForOtherMap = new ConcurrentHashMap(); } - public EventBus getEventBus() { - return this.eventBus; - } - // for deploy manager public void addBackends(List> hostPortPairs, boolean isFree) throws DdlException { addBackends(hostPortPairs, isFree, ""); @@ -217,10 +207,6 @@ public class SystemInfoService { Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); - // publish - eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED, "backend has been dropped", - Long.valueOf(droppedBackend.getId()))); - // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); copiedBackends.remove(droppedBackend.getId());