Clear client pool when heartbeat failed (#408)
When heartbeat failed, we should clear the connections cached in client pool, or we will get broken connections from the pool. Since we don't have the REOPEN logic(which may cause ugly code style), a broken connection may cause a rpc blocked and failed. So clear them all and recreate them when needed is a simple way to resolve this problem. We only clear connections in backend and broker pool. No need to clear heartbeat pool because heartbeat is very frequent, such the connections can be invalid automatically.
This commit is contained in:
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
|
||||
@ -26,13 +24,15 @@ import org.apache.commons.pool2.PooledObject;
|
||||
import org.apache.commons.pool2.impl.DefaultPooledObject;
|
||||
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
|
||||
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
|
||||
private static final Logger LOG = LogManager.getLogger(GenericPool.class);
|
||||
@ -73,6 +73,10 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
|
||||
return ok;
|
||||
}
|
||||
|
||||
public void clearPool(TNetworkAddress addr) {
|
||||
pool.clear(addr);
|
||||
}
|
||||
|
||||
public boolean peak(VALUE object) {
|
||||
return object.getOutputProtocol().getTransport().peek();
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.http.rest.BootstrapFinishAction;
|
||||
import org.apache.doris.persist.HbPackage;
|
||||
import org.apache.doris.system.BackendEvent.BackendEventType;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
import org.apache.doris.thrift.HeartbeatService;
|
||||
import org.apache.doris.thrift.TBackendInfo;
|
||||
@ -165,9 +164,9 @@ public class HeartbeatMgr extends Daemon {
|
||||
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
|
||||
if (be != null) {
|
||||
boolean isChanged = be.handleHbResponse(hbResponse);
|
||||
if (hbResponse.getStatus() != HbStatus.OK && !isReplay) {
|
||||
nodeMgr.getEventBus().post(new BackendEvent(BackendEventType.BACKEND_DOWN,
|
||||
"missing heartbeat", Long.valueOf(hbResponse.getBeId())));
|
||||
if (hbResponse.getStatus() != HbStatus.OK) {
|
||||
// invalid all connections cached in ClientPool
|
||||
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
|
||||
}
|
||||
return isChanged;
|
||||
}
|
||||
@ -178,7 +177,12 @@ public class HeartbeatMgr extends Daemon {
|
||||
FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(
|
||||
hbResponse.getName(), hbResponse.getHost(), hbResponse.getPort());
|
||||
if (broker != null) {
|
||||
return broker.handleHbResponse(hbResponse);
|
||||
boolean isChanged = broker.handleHbResponse(hbResponse);
|
||||
if (hbResponse.getStatus() != HbStatus.OK) {
|
||||
// invalid all connections cached in ClientPool
|
||||
ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port));
|
||||
}
|
||||
return isChanged;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.system.Backend.BackendState;
|
||||
import org.apache.doris.system.BackendEvent.BackendEventType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -35,7 +34,6 @@ import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import org.apache.commons.validator.routines.InetAddressValidator;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -65,8 +63,6 @@ public class SystemInfoService {
|
||||
private volatile AtomicReference<ImmutableMap<Long, Backend>> idToBackendRef;
|
||||
private volatile AtomicReference<ImmutableMap<Long, AtomicLong>> idToReportVersionRef;
|
||||
|
||||
private final EventBus eventBus;
|
||||
|
||||
// last backend id used by round robin for sequential choosing backends for
|
||||
// tablet creation
|
||||
private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
|
||||
@ -94,16 +90,10 @@ public class SystemInfoService {
|
||||
idToReportVersionRef = new AtomicReference<ImmutableMap<Long, AtomicLong>>(
|
||||
ImmutableMap.<Long, AtomicLong> of());
|
||||
|
||||
eventBus = new EventBus("backendEvent");
|
||||
|
||||
lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
|
||||
lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
|
||||
}
|
||||
|
||||
public EventBus getEventBus() {
|
||||
return this.eventBus;
|
||||
}
|
||||
|
||||
// for deploy manager
|
||||
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws DdlException {
|
||||
addBackends(hostPortPairs, isFree, "");
|
||||
@ -217,10 +207,6 @@ public class SystemInfoService {
|
||||
|
||||
Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort);
|
||||
|
||||
// publish
|
||||
eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED, "backend has been dropped",
|
||||
Long.valueOf(droppedBackend.getId())));
|
||||
|
||||
// update idToBackend
|
||||
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
|
||||
copiedBackends.remove(droppedBackend.getId());
|
||||
|
||||
Reference in New Issue
Block a user