diff --git a/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java b/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java index cf32d72..14afce8 100644 --- a/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java +++ b/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java @@ -4,6 +4,7 @@ import org.postgresql.clusterhealthy.ClusterNodeCache; import org.postgresql.core.QueryExecutor; import org.postgresql.log.Log; import org.postgresql.log.Logger; +import org.postgresql.util.HostSpec; import java.util.ArrayList; import java.util.HashMap; @@ -58,6 +59,10 @@ public class GlobalConnectionTracker { hostConnection.put(identityQueryExecute, queryExecutor); connectionManager.put(hostSpec, hostConnection); } + if (isTargetServerMaster(props)) { + HostSpec[] hostSpecs = Driver.GetHostSpecs(props); + ClusterNodeCache.pushHostSpecs(queryExecutor.getHostSpec(), hostSpecs, props); + } } /** @@ -80,11 +85,11 @@ public class GlobalConnectionTracker { } else { LOGGER.info("[SWITCHOVER] The identity of the queryExecutor has changed!"); } + ClusterNodeCache.updateDetection(); } else { LOGGER.info("[SWITCHOVER] No connection found under this host!"); } } - ClusterNodeCache.updateDetection(); } /** @@ -118,7 +123,7 @@ public class GlobalConnectionTracker { public static void closeConnectionOfCrash(String hostSpec) { synchronized (connectionManager) { HashMap hostConnection = connectionManager.getOrDefault(hostSpec, null); - if (hostConnection != null) { + if (hostConnection != null && !hostConnection.isEmpty()) { LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, start to close the original connection."); for (QueryExecutor queryExecutor : hostConnection.values()) { if (queryExecutor != null && !queryExecutor.isClosed()) { diff --git a/pgjdbc/src/main/java/org/postgresql/PGProperty.java b/pgjdbc/src/main/java/org/postgresql/PGProperty.java index 4675e42..df969e5 100644 --- a/pgjdbc/src/main/java/org/postgresql/PGProperty.java +++ b/pgjdbc/src/main/java/org/postgresql/PGProperty.java @@ -535,7 +535,18 @@ public enum PGProperty { /** * It is used to detect the thread interval of the survival task on the primary node in the high availability scenario. */ - HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time") + HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time"), + + /** + * In the scenario where heartbeat maintenance is enabled for the active node, + * if the active node is down, set the timeout threshold for searching for the active node. + * If the active node is not detected within this timeout period, + * the cluster is considered to have no active node and no maintenance is performed on the current cluster. + * This time should include the RTO time of the active node。 + */ + MASTER_FAILURE_HEARTBEAT_TIMEOUT("masterFailureHeartbeatTimeout", "30000", "In the scenario where heartbeat maintenance is enabled for the active node, " + + "if the active node is down, set the timeout threshold for searching for the active node. If the active node is not detected within this timeout period, " + + "the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node.") ; diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java index c79e143..fad5279 100644 --- a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java @@ -33,7 +33,6 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Date; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -55,6 +54,7 @@ public class ClusterHeartBeat { private final ConnectionFactoryImpl FACTORY = new ConnectionFactoryImpl(); private volatile boolean detection; private final Long DEFAULT_INTERVAL = 5000L; + private final String DEFAULT_TIMEOUT = "30000"; private volatile AtomicLong periodTime = new AtomicLong(DEFAULT_INTERVAL); @@ -64,8 +64,9 @@ public class ClusterHeartBeat { public void masterNodeProbe() { while (isOpen()) { // Detects whether the loop is broken - if (detection && !GlobalConnectionTracker.hasConnection()) { + if (detection && ClusterHeartBeatFailureCluster.getInstance().failureCluster.isEmpty() && !GlobalConnectionTracker.hasConnection()) { ClusterNodeCache.stop(); + LOGGER.debug("heartBeat thread stop"); break; } LOGGER.debug("heartBeat thread start time: " + new Date(System.currentTimeMillis())); @@ -124,6 +125,12 @@ public class ClusterHeartBeat { long time = Long.parseLong(period); periodTime.set(Math.min(periodTime.get(), time)); } + String timeout = PGProperty.MASTER_FAILURE_HEARTBEAT_TIMEOUT.get(properties); + if (!ClusterNodeCache.isNumeric(timeout)) { + LOGGER.debug("Invalid heartbeatPeriod value: " + timeout); + timeout = DEFAULT_TIMEOUT; + } + ClusterHeartBeatFailureCluster.getInstance().setThresholdValue((int) (Long.parseLong(timeout) / periodTime.get())); } /** @@ -254,14 +261,14 @@ public class ClusterHeartBeat { * @param slaves slaves set * @param props the parsed/defaulted connection properties */ - public void cacheProcess(HostSpec hostSpec, Set slaves, Set props) { + public void cacheProcess(HostSpec hostSpec, Set slaves, Set props, Integer frequency) { HostSpec maseterNode = findMasterNode(slaves, props); removeClusterNode(hostSpec, maseterNode, slaves); if (maseterNode != null) { addProperties(maseterNode, props); ClusterHeartBeatFailureMaster.getInstance().addFailureMaster(hostSpec, maseterNode); } else { - FailureCluster cluster = new FailureCluster(hostSpec, slaves, props); + FailureCluster cluster = new FailureCluster(hostSpec, slaves, props, frequency); ClusterHeartBeatFailureCluster.getInstance().addFailureCluster(cluster); } GlobalConnectionTracker.closeConnectionOfCrash(hostSpec.toString()); diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java index 4ac4404..6019d31 100644 --- a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java @@ -32,6 +32,7 @@ import java.util.Set; public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{ public List failureCluster = new ArrayList<>(); + private int thresholdValue; private volatile static ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster; private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureCluster.class.getName()); private ClusterHeartBeatFailureCluster() { @@ -80,7 +81,10 @@ public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{ if (count == salves.size()) { continue; } - cacheProcess(cluster.getMaster(), salves, cluster.getProps()); + int frequency = cluster.getFrequency(); + if (thresholdValue > frequency) { + cacheProcess(cluster.getMaster(), salves, cluster.getProps(), ++ frequency); + } } if (queryExecutor != null) { boolean isMaster = nodeRoleIsMaster(queryExecutor); @@ -112,4 +116,8 @@ public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{ failureCluster.clear(); } + public void setThresholdValue(int thresholdValue) { + this.thresholdValue = thresholdValue; + } + } diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java index 94120cf..810a733 100644 --- a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java @@ -71,13 +71,13 @@ public class ClusterHeartBeatMaster extends ClusterHeartBeat { queryExecutor = super.getQueryExecutor(master, propertiesSet); } catch (SQLException e) { LOGGER.debug("acquire QueryExecutor failure"); - super.cacheProcess(master, slaves, propertiesSet); + super.cacheProcess(master, slaves, propertiesSet, null); continue; } LOGGER.debug("Information about the current connected node " + queryExecutor.getSocketAddress()); if (!super.nodeRoleIsMaster(queryExecutor)) { LOGGER.debug(master + ":The host is degraded to the standby server."); - super.cacheProcess(master, slaves, propertiesSet); + super.cacheProcess(master, slaves, propertiesSet, null); } } } diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java index 92e2424..ce80c87 100644 --- a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java @@ -87,17 +87,16 @@ public class ClusterNodeCache { Set set = Arrays.stream(hostSpecs) .collect(Collectors.toSet()); String period = PGProperty.HEARTBEAT_PERIOD.get(properties); - boolean open = true; if (!isNumeric(period)) { - open = false; LOGGER.debug("Invalid heartbeatPeriod value: " + period); + return; } long timePeriod = Long.parseLong(period); if (timePeriod <= 0) { - open = false; LOGGER.debug("Invalid heartbeatPeriod value: " + period); + return; } - if (set.size() > 1 && open) { + if (set.size() > 1) { CLUSTER_HEART_BEAT.addNodeRelationship(master, hostSpecs, properties); start(); } @@ -127,10 +126,11 @@ public class ClusterNodeCache { CLUSTER_HEART_BEAT.clear(); CLUSTER_HEART_BEAT.initPeriodTime(); executorService.shutdown(); + executorService = Executors.newSingleThreadExecutor(); } } - private static boolean isNumeric(final CharSequence cs) { + public static boolean isNumeric(final CharSequence cs) { if (cs.length() == 0) { return false; } diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java index 9af78d1..6eae3df 100644 --- a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java @@ -27,6 +27,7 @@ public class FailureCluster { private HostSpec master; private Set salves; private Set props; + private int frequency; /** * @@ -34,10 +35,19 @@ public class FailureCluster { * @param salves Slave set * @param props Connection information */ - public FailureCluster(HostSpec master, Set salves, Set props) { + public FailureCluster(HostSpec master, Set salves, Set props, Integer frequency) { this.master = master; this.salves = salves; this.props = props; + this.frequency = null == frequency ? 0 : frequency; + } + + public int getFrequency() { + return frequency; + } + + public void setFrequency(int frequency) { + this.frequency = frequency; } public void setSalves(Set salves) { @@ -63,4 +73,12 @@ public class FailureCluster { public Set getProps() { return props; } + + @Override + public String toString() { + return "FailureCluster{" + + "master=" + master + + ", salves=" + salves + + '}'; + } } diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java index 21fa3ae..0385865 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java @@ -330,9 +330,6 @@ public class ConnectionFactoryImpl extends ConnectionFactory { if (candidateHost.targetServerType != HostRequirement.any) { hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Secondary; LOGGER.info("Known status of host " + hostSpec + " is " + hostStatus); - if (targetServerType == HostRequirement.master && hostStatus == HostStatus.Master) { - ClusterNodeCache.pushHostSpecs(hostSpec, currentHostSpecs, info); - } } GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus, info); knownStates.put(hostSpec, hostStatus); diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java index b787f1b..656c444 100644 --- a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java @@ -28,7 +28,7 @@ public class ClusterHeartBeatFailureClusterTest { hostSpecs.remove(master); HashSet set = new HashSet<>(); set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); - FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set); + FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set, 0); clusterHeartBeatFailureCluster.addFailureCluster(failureCluster); System.out.println(clusterHeartBeatFailureCluster.getFailureCluster()); clusterHeartBeatFailureCluster.run();