diff --git a/fe/src/main/java/org/apache/doris/common/GenericPool.java b/fe/src/main/java/org/apache/doris/common/GenericPool.java index 549a6dabb7..a38cc4fd41 100644 --- a/fe/src/main/java/org/apache/doris/common/GenericPool.java +++ b/fe/src/main/java/org/apache/doris/common/GenericPool.java @@ -17,8 +17,6 @@ package org.apache.doris.common; -import java.lang.reflect.Constructor; - import org.apache.doris.thrift.TNetworkAddress; import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; @@ -26,13 +24,15 @@ import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; + +import java.lang.reflect.Constructor; public class GenericPool { private static final Logger LOG = LogManager.getLogger(GenericPool.class); @@ -73,6 +73,10 @@ public class GenericPool { return ok; } + public void clearPool(TNetworkAddress addr) { + pool.clear(addr); + } + public boolean peak(VALUE object) { return object.getOutputProtocol().getTransport().peek(); } diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7c8d422636..ee3e84bcc7 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -26,7 +26,6 @@ import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; import org.apache.doris.http.rest.BootstrapFinishAction; import org.apache.doris.persist.HbPackage; -import org.apache.doris.system.BackendEvent.BackendEventType; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TBackendInfo; @@ -165,9 +164,9 @@ public class HeartbeatMgr extends Daemon { Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { boolean isChanged = be.handleHbResponse(hbResponse); - if (hbResponse.getStatus() != HbStatus.OK && !isReplay) { - nodeMgr.getEventBus().post(new BackendEvent(BackendEventType.BACKEND_DOWN, - "missing heartbeat", Long.valueOf(hbResponse.getBeId()))); + if (hbResponse.getStatus() != HbStatus.OK) { + // invalid all connections cached in ClientPool + ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); } return isChanged; } @@ -178,7 +177,12 @@ public class HeartbeatMgr extends Daemon { FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker( hbResponse.getName(), hbResponse.getHost(), hbResponse.getPort()); if (broker != null) { - return broker.handleHbResponse(hbResponse); + boolean isChanged = broker.handleHbResponse(hbResponse); + if (hbResponse.getStatus() != HbStatus.OK) { + // invalid all connections cached in ClientPool + ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port)); + } + return isChanged; } break; } diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index f8e9e36d25..1fb198e1de 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -26,7 +26,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend.BackendState; -import org.apache.doris.system.BackendEvent.BackendEventType; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -35,7 +34,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.eventbus.EventBus; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; @@ -65,8 +63,6 @@ public class SystemInfoService { private volatile AtomicReference> idToBackendRef; private volatile AtomicReference> idToReportVersionRef; - private final EventBus eventBus; - // last backend id used by round robin for sequential choosing backends for // tablet creation private ConcurrentHashMap lastBackendIdForCreationMap; @@ -94,16 +90,10 @@ public class SystemInfoService { idToReportVersionRef = new AtomicReference>( ImmutableMap. of()); - eventBus = new EventBus("backendEvent"); - lastBackendIdForCreationMap = new ConcurrentHashMap(); lastBackendIdForOtherMap = new ConcurrentHashMap(); } - public EventBus getEventBus() { - return this.eventBus; - } - // for deploy manager public void addBackends(List> hostPortPairs, boolean isFree) throws DdlException { addBackends(hostPortPairs, isFree, ""); @@ -217,10 +207,6 @@ public class SystemInfoService { Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); - // publish - eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED, "backend has been dropped", - Long.valueOf(droppedBackend.getId()))); - // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); copiedBackends.remove(droppedBackend.getId());