!125 【master回合】修复集群主机心跳线程退出问题

Merge pull request !125 from 陈紫阳/master
This commit is contained in:
opengauss-bot
2023-03-27 12:19:21 +00:00
committed by Gitee
9 changed files with 66 additions and 20 deletions

View File

@ -4,6 +4,7 @@ import org.postgresql.clusterhealthy.ClusterNodeCache;
import org.postgresql.core.QueryExecutor;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import java.util.ArrayList;
import java.util.HashMap;
@ -58,6 +59,10 @@ public class GlobalConnectionTracker {
hostConnection.put(identityQueryExecute, queryExecutor);
connectionManager.put(hostSpec, hostConnection);
}
if (isTargetServerMaster(props)) {
HostSpec[] hostSpecs = Driver.GetHostSpecs(props);
ClusterNodeCache.pushHostSpecs(queryExecutor.getHostSpec(), hostSpecs, props);
}
}
/**
@ -80,11 +85,11 @@ public class GlobalConnectionTracker {
} else {
LOGGER.info("[SWITCHOVER] The identity of the queryExecutor has changed!");
}
ClusterNodeCache.updateDetection();
} else {
LOGGER.info("[SWITCHOVER] No connection found under this host!");
}
}
ClusterNodeCache.updateDetection();
}
/**
@ -118,7 +123,7 @@ public class GlobalConnectionTracker {
public static void closeConnectionOfCrash(String hostSpec) {
synchronized (connectionManager) {
HashMap<Integer, QueryExecutor> hostConnection = connectionManager.getOrDefault(hostSpec, null);
if (hostConnection != null) {
if (hostConnection != null && !hostConnection.isEmpty()) {
LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, start to close the original connection.");
for (QueryExecutor queryExecutor : hostConnection.values()) {
if (queryExecutor != null && !queryExecutor.isClosed()) {

View File

@ -535,7 +535,18 @@ public enum PGProperty {
/**
* It is used to detect the thread interval of the survival task on the primary node in the high availability scenario.
*/
HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time")
HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time"),
/**
* In the scenario where heartbeat maintenance is enabled for the active node,
* if the active node is down, set the timeout threshold for searching for the active node.
* If the active node is not detected within this timeout period,
* the cluster is considered to have no active node and no maintenance is performed on the current cluster.
* This time should include the RTO time of the active node。
*/
MASTER_FAILURE_HEARTBEAT_TIMEOUT("masterFailureHeartbeatTimeout", "30000", "In the scenario where heartbeat maintenance is enabled for the active node, " +
"if the active node is down, set the timeout threshold for searching for the active node. If the active node is not detected within this timeout period, " +
"the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node.")
;

View File

@ -33,7 +33,6 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -55,6 +54,7 @@ public class ClusterHeartBeat {
private final ConnectionFactoryImpl FACTORY = new ConnectionFactoryImpl();
private volatile boolean detection;
private final Long DEFAULT_INTERVAL = 5000L;
private final String DEFAULT_TIMEOUT = "30000";
private volatile AtomicLong periodTime = new AtomicLong(DEFAULT_INTERVAL);
@ -64,8 +64,9 @@ public class ClusterHeartBeat {
public void masterNodeProbe() {
while (isOpen()) {
// Detects whether the loop is broken
if (detection && !GlobalConnectionTracker.hasConnection()) {
if (detection && ClusterHeartBeatFailureCluster.getInstance().failureCluster.isEmpty() && !GlobalConnectionTracker.hasConnection()) {
ClusterNodeCache.stop();
LOGGER.debug("heartBeat thread stop");
break;
}
LOGGER.debug("heartBeat thread start time: " + new Date(System.currentTimeMillis()));
@ -124,6 +125,12 @@ public class ClusterHeartBeat {
long time = Long.parseLong(period);
periodTime.set(Math.min(periodTime.get(), time));
}
String timeout = PGProperty.MASTER_FAILURE_HEARTBEAT_TIMEOUT.get(properties);
if (!ClusterNodeCache.isNumeric(timeout)) {
LOGGER.debug("Invalid heartbeatPeriod value: " + timeout);
timeout = DEFAULT_TIMEOUT;
}
ClusterHeartBeatFailureCluster.getInstance().setThresholdValue((int) (Long.parseLong(timeout) / periodTime.get()));
}
/**
@ -254,14 +261,14 @@ public class ClusterHeartBeat {
* @param slaves slaves set
* @param props the parsed/defaulted connection properties
*/
public void cacheProcess(HostSpec hostSpec, Set<HostSpec> slaves, Set<Properties> props) {
public void cacheProcess(HostSpec hostSpec, Set<HostSpec> slaves, Set<Properties> props, Integer frequency) {
HostSpec maseterNode = findMasterNode(slaves, props);
removeClusterNode(hostSpec, maseterNode, slaves);
if (maseterNode != null) {
addProperties(maseterNode, props);
ClusterHeartBeatFailureMaster.getInstance().addFailureMaster(hostSpec, maseterNode);
} else {
FailureCluster cluster = new FailureCluster(hostSpec, slaves, props);
FailureCluster cluster = new FailureCluster(hostSpec, slaves, props, frequency);
ClusterHeartBeatFailureCluster.getInstance().addFailureCluster(cluster);
}
GlobalConnectionTracker.closeConnectionOfCrash(hostSpec.toString());

View File

@ -32,6 +32,7 @@ import java.util.Set;
public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{
public List<FailureCluster> failureCluster = new ArrayList<>();
private int thresholdValue;
private volatile static ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster;
private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureCluster.class.getName());
private ClusterHeartBeatFailureCluster() {
@ -80,7 +81,10 @@ public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{
if (count == salves.size()) {
continue;
}
cacheProcess(cluster.getMaster(), salves, cluster.getProps());
int frequency = cluster.getFrequency();
if (thresholdValue > frequency) {
cacheProcess(cluster.getMaster(), salves, cluster.getProps(), ++ frequency);
}
}
if (queryExecutor != null) {
boolean isMaster = nodeRoleIsMaster(queryExecutor);
@ -112,4 +116,8 @@ public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{
failureCluster.clear();
}
public void setThresholdValue(int thresholdValue) {
this.thresholdValue = thresholdValue;
}
}

View File

@ -71,13 +71,13 @@ public class ClusterHeartBeatMaster extends ClusterHeartBeat {
queryExecutor = super.getQueryExecutor(master, propertiesSet);
} catch (SQLException e) {
LOGGER.debug("acquire QueryExecutor failure");
super.cacheProcess(master, slaves, propertiesSet);
super.cacheProcess(master, slaves, propertiesSet, null);
continue;
}
LOGGER.debug("Information about the current connected node " + queryExecutor.getSocketAddress());
if (!super.nodeRoleIsMaster(queryExecutor)) {
LOGGER.debug(master + ":The host is degraded to the standby server.");
super.cacheProcess(master, slaves, propertiesSet);
super.cacheProcess(master, slaves, propertiesSet, null);
}
}
}

View File

@ -87,17 +87,16 @@ public class ClusterNodeCache {
Set<HostSpec> set = Arrays.stream(hostSpecs)
.collect(Collectors.toSet());
String period = PGProperty.HEARTBEAT_PERIOD.get(properties);
boolean open = true;
if (!isNumeric(period)) {
open = false;
LOGGER.debug("Invalid heartbeatPeriod value: " + period);
return;
}
long timePeriod = Long.parseLong(period);
if (timePeriod <= 0) {
open = false;
LOGGER.debug("Invalid heartbeatPeriod value: " + period);
return;
}
if (set.size() > 1 && open) {
if (set.size() > 1) {
CLUSTER_HEART_BEAT.addNodeRelationship(master, hostSpecs, properties);
start();
}
@ -127,10 +126,11 @@ public class ClusterNodeCache {
CLUSTER_HEART_BEAT.clear();
CLUSTER_HEART_BEAT.initPeriodTime();
executorService.shutdown();
executorService = Executors.newSingleThreadExecutor();
}
}
private static boolean isNumeric(final CharSequence cs) {
public static boolean isNumeric(final CharSequence cs) {
if (cs.length() == 0) {
return false;
}

View File

@ -27,6 +27,7 @@ public class FailureCluster {
private HostSpec master;
private Set<HostSpec> salves;
private Set<Properties> props;
private int frequency;
/**
*
@ -34,10 +35,19 @@ public class FailureCluster {
* @param salves Slave set
* @param props Connection information
*/
public FailureCluster(HostSpec master, Set<HostSpec> salves, Set<Properties> props) {
public FailureCluster(HostSpec master, Set<HostSpec> salves, Set<Properties> props, Integer frequency) {
this.master = master;
this.salves = salves;
this.props = props;
this.frequency = null == frequency ? 0 : frequency;
}
public int getFrequency() {
return frequency;
}
public void setFrequency(int frequency) {
this.frequency = frequency;
}
public void setSalves(Set<HostSpec> salves) {
@ -63,4 +73,12 @@ public class FailureCluster {
public Set<Properties> getProps() {
return props;
}
@Override
public String toString() {
return "FailureCluster{" +
"master=" + master +
", salves=" + salves +
'}';
}
}

View File

@ -330,9 +330,6 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
if (candidateHost.targetServerType != HostRequirement.any) {
hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Secondary;
LOGGER.info("Known status of host " + hostSpec + " is " + hostStatus);
if (targetServerType == HostRequirement.master && hostStatus == HostStatus.Master) {
ClusterNodeCache.pushHostSpecs(hostSpec, currentHostSpecs, info);
}
}
GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus, info);
knownStates.put(hostSpec, hostStatus);

View File

@ -28,7 +28,7 @@ public class ClusterHeartBeatFailureClusterTest {
hostSpecs.remove(master);
HashSet<Properties> set = new HashSet<>();
set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs()));
FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set);
FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set, 0);
clusterHeartBeatFailureCluster.addFailureCluster(failureCluster);
System.out.println(clusterHeartBeatFailureCluster.getFailureCluster());
clusterHeartBeatFailureCluster.run();