心跳检测任务
This commit is contained in:
		| @ -5,7 +5,9 @@ import org.postgresql.jdbc.PgConnection; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
|  | ||||
| import java.util.ArrayList; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
|  | ||||
| @ -26,6 +28,16 @@ public class GlobalConnectionTracker { | ||||
|         return PGProperty.FORCE_TARGET_SERVER_SLAVE.getBoolean(props) && | ||||
|                 ("slave".equals(PGProperty.TARGET_SERVER_TYPE.get(props)) || "secondary".equals(PGProperty.TARGET_SERVER_TYPE.get(props))); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * | ||||
|      * @param props the parsed/defaulted connection properties | ||||
|      * @return | ||||
|      */ | ||||
|     private static boolean isTargetServerMaster(Properties props) { | ||||
|         return "master".equals(PGProperty.TARGET_SERVER_TYPE.get(props)); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Store the actual query executor and connection host spec. | ||||
|      * | ||||
| @ -33,7 +45,9 @@ public class GlobalConnectionTracker { | ||||
|      * @param queryExecutor | ||||
|      */ | ||||
|     public static void possessConnectionReference(QueryExecutor queryExecutor, Properties props) { | ||||
|         if(!isForceTargetServerSlave(props)) return; | ||||
|         if (!isForceTargetServerSlave(props) || !isTargetServerMaster(props)) { | ||||
|             return; | ||||
|         } | ||||
|         int identityQueryExecute = System.identityHashCode(queryExecutor); | ||||
|         String hostSpec = queryExecutor.getHostSpec().toString(); | ||||
|         synchronized (connectionManager) { | ||||
| @ -53,7 +67,9 @@ public class GlobalConnectionTracker { | ||||
|      * @param queryExecutor | ||||
|      */ | ||||
|     public static void releaseConnectionReference(QueryExecutor queryExecutor, Properties props) { | ||||
|         if(!isForceTargetServerSlave(props)) return; | ||||
|         if (!isForceTargetServerSlave(props)) { | ||||
|             return; | ||||
|         } | ||||
|         String hostSpec = queryExecutor.getHostSpec().toString(); | ||||
|         int identityQueryExecute = System.identityHashCode(queryExecutor); | ||||
|         synchronized (connectionManager) { | ||||
| @ -93,4 +109,46 @@ public class GlobalConnectionTracker { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Close all existing connections under the specified host. | ||||
|      * | ||||
|      * @param hostSpec ip and port. | ||||
|      */ | ||||
|     public static void closeConnectionOfCrash(String hostSpec) { | ||||
|         synchronized (connectionManager) { | ||||
|             HashMap<Integer, QueryExecutor> hostConnection = connectionManager.getOrDefault(hostSpec, null); | ||||
|             if (hostConnection != null) { | ||||
|                 LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, start to close the original connection."); | ||||
|                 for (QueryExecutor queryExecutor : hostConnection.values()) { | ||||
|                     if (queryExecutor != null && !queryExecutor.isClosed()) { | ||||
|                         queryExecutor.close(); | ||||
|                         queryExecutor.setAvailability(false); | ||||
|                     } | ||||
|                 } | ||||
|                 hostConnection.clear(); | ||||
|                 LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, end to close the original connection."); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * get all existing connections under the specified host. | ||||
|      * | ||||
|      * @param hostSpec ip and port. | ||||
|      */ | ||||
|     public static List<QueryExecutor> getConnections(String hostSpec) { | ||||
|         synchronized (connectionManager) { | ||||
|             List<QueryExecutor> ret = new ArrayList<>(); | ||||
|             HashMap<Integer, QueryExecutor> hostConnection = connectionManager.getOrDefault(hostSpec, null); | ||||
|             if (hostConnection != null) { | ||||
|                 for (Map.Entry<Integer, QueryExecutor> queryExecutorEntry : hostConnection.entrySet()) { | ||||
|                     ret.add(queryExecutorEntry.getValue()); | ||||
|                 } | ||||
|             } | ||||
|             return ret; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
|  | ||||
| } | ||||
| @ -496,6 +496,11 @@ public enum PGProperty { | ||||
|           "", | ||||
|           "Factory class to instantiate factories for XML processing"), | ||||
|  | ||||
|   /** | ||||
|    * It is used to detect the thread interval of the survival task on the primary node in the high availability scenario. | ||||
|    */ | ||||
|   HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time") | ||||
|  | ||||
|   ; | ||||
|  | ||||
|   private String _name; | ||||
|  | ||||
| @ -0,0 +1,276 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.PGProperty; | ||||
| import org.postgresql.core.PGStream; | ||||
| import org.postgresql.core.QueryExecutor; | ||||
| import org.postgresql.core.SocketFactoryFactory; | ||||
| import org.postgresql.core.v3.ConnectionFactoryImpl; | ||||
| import org.postgresql.core.v3.QueryExecutorImpl; | ||||
| import org.postgresql.jdbc.SslMode; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import javax.net.SocketFactory; | ||||
| import java.io.IOException; | ||||
| import java.sql.SQLException; | ||||
| import java.util.*; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
|  | ||||
| import static org.postgresql.GlobalConnectionTracker.closeConnectionOfCrash; | ||||
| import static org.postgresql.GlobalConnectionTracker.getConnections; | ||||
| import static org.postgresql.clusterhealthy.ClusterNodeCache.isOpen; | ||||
| import static org.postgresql.util.PSQLState.CONNECTION_REJECTED; | ||||
|  | ||||
| /** | ||||
|  * Heartbeat detection, active detection of the primary node, and maintenance of cache after failure The next | ||||
|  * contest pair active detection of the failed node, detection of the cluster where the primary node has failed, | ||||
|  * and maintenance of the latest information to the cache set。 | ||||
|  */ | ||||
| public class ClusterHeartBeat { | ||||
|  | ||||
|     public static final Map<HostSpec, Set<Properties>> CLUSTER_PROPERTIES = new ConcurrentHashMap<>(); | ||||
|     private static Log LOGGER = Logger.getLogger(ClusterHeartBeat.class.getName()); | ||||
|     private static final ConnectionFactoryImpl FACTORY = new ConnectionFactoryImpl(); | ||||
|     private static final String UPDATE_TIME = "time"; | ||||
|     private volatile Long periodTime = 5000L; | ||||
|  | ||||
|  | ||||
|     /** | ||||
|      * heartbeat task thread. | ||||
|      */ | ||||
|     public void masterNodeProbe() { | ||||
|         while (isOpen()) { | ||||
|             LOGGER.debug("heartBeat thread start time: " + new Date(System.currentTimeMillis())); | ||||
|             // failed node detection | ||||
|             ClusterHeartBeatFailureMaster.getInstance().run(); | ||||
|  | ||||
|             // active primary node detection | ||||
|             ClusterHeartBeatMaster.getInstance().run(); | ||||
|             // The failed cluster seeks the primary node | ||||
|             ClusterHeartBeatFailureCluster.getInstance().run(); | ||||
|             try { | ||||
|                 Thread.sleep(periodTime); | ||||
|             } catch (InterruptedException e) { | ||||
|                 LOGGER.debug(e.getStackTrace()); | ||||
|             } | ||||
|         } | ||||
|         periodTime = 5000L; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * get parsed/defaulted connection properties | ||||
|      * @param hostSpec ip and port | ||||
|      * @return properties set | ||||
|      */ | ||||
|     public Set<Properties> getProperties(HostSpec hostSpec) { | ||||
|         synchronized (CLUSTER_PROPERTIES) { | ||||
|             return CLUSTER_PROPERTIES.computeIfAbsent(hostSpec, k -> new HashSet<>()); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public Map<HostSpec, Set<HostSpec>> getClusterRelationship () { | ||||
|         return ClusterHeartBeatMaster.getInstance().getClusterRelationship(); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Adding Cluster Information | ||||
|      * @param key master | ||||
|      * @param value secondary list | ||||
|      * @param properties the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public void addNodeRelationship (HostSpec key, HostSpec[] value, Properties properties) { | ||||
|         // update node host and ip | ||||
|         addClusterNode(key, value); | ||||
|         // update node properties | ||||
|         addProperties(key,  Collections.singleton(properties)); | ||||
|         if (PGProperty.HEARTBEAT_PERIOD.get(properties) != null) { | ||||
|             String period = PGProperty.HEARTBEAT_PERIOD.get(properties); | ||||
|             long time = Long.parseLong(period); | ||||
|             synchronized (UPDATE_TIME) { | ||||
|                 if (time > 0) { | ||||
|                     periodTime = Math.min(periodTime, time); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * | ||||
|      * @param hostSpec ip and port | ||||
|      * @param properties the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public void addProperties(HostSpec hostSpec, Set<Properties> properties) { | ||||
|         synchronized (CLUSTER_PROPERTIES) { | ||||
|             Set<Properties> propertiesSet = CLUSTER_PROPERTIES.get(hostSpec); | ||||
|             if (propertiesSet == null) { | ||||
|                 propertiesSet = new HashSet<>(); | ||||
|             } | ||||
|             propertiesSet.addAll(properties); | ||||
|             CLUSTER_PROPERTIES.put(hostSpec, propertiesSet); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Update the cluster node relationship | ||||
|      * @param key old master | ||||
|      * @param newKey new master | ||||
|      * @param slaves slaves set | ||||
|      */ | ||||
|     public void removeClusterNode(HostSpec key, HostSpec newKey, Set<HostSpec> slaves) { | ||||
|         ClusterHeartBeatMaster.getInstance().removeClusterNode(key, newKey, slaves); | ||||
|  | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Perform cache maintenance on cluster nodes connected to hosts | ||||
|      * @param hostSpecs the parsed/defaulted connection properties | ||||
|      * @param value slaves set | ||||
|      */ | ||||
|     public void addClusterNode(HostSpec hostSpecs, HostSpec... value) { | ||||
|         ClusterHeartBeatMaster.getInstance().addClusterNode(hostSpecs, value); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Get the cluster slave node | ||||
|      * @param hostSpec the parsed/defaulted connection properties | ||||
|      * @return slaves set | ||||
|      */ | ||||
|     public Set<HostSpec> getClusterSalveNode(HostSpec hostSpec) { | ||||
|         return ClusterHeartBeatMaster.getInstance().getClusterSalveNode(hostSpec); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * | ||||
|      * @param hostSpec ip and port | ||||
|      * @param properties the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public void removeProperties(HostSpec hostSpec, Properties properties) { | ||||
|         synchronized (CLUSTER_PROPERTIES) { | ||||
|             Set<Properties> propertiesSet = CLUSTER_PROPERTIES.getOrDefault(hostSpec, null); | ||||
|             if (propertiesSet != null) { | ||||
|                 propertiesSet.remove(properties); | ||||
|                 CLUSTER_PROPERTIES.put(hostSpec, propertiesSet); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * the node probes the activity by reflecting the tryConnect() method. | ||||
|      * | ||||
|      * @param hostSpec ip and port. | ||||
|      * @param propSet  the parsed/defaulted connection properties | ||||
|      * @return QueryExecutor | ||||
|      * @throws SQLException new sql exception | ||||
|      */ | ||||
|     public QueryExecutor getQueryExecutor(HostSpec hostSpec, Set<Properties> propSet) throws SQLException { | ||||
|         Properties props = null; | ||||
|         try { | ||||
|             for (Properties properties : propSet) { | ||||
|                 props = properties; | ||||
|                 SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(props); | ||||
|                 SslMode sslMode = SslMode.of(props); | ||||
|                 String user = props.getProperty("user", ""); | ||||
|                 String database = props.getProperty("PGDBNAME", ""); | ||||
|                 PGStream pgStream = FACTORY.tryConnect(user, database, props, socketFactory, hostSpec, sslMode); | ||||
|                 return new QueryExecutorImpl(pgStream, user, database, | ||||
|                         1000, props); | ||||
|             } | ||||
|         } catch (SQLException e) { | ||||
|             String sqlState = e.getSQLState(); | ||||
|             if (CONNECTION_REJECTED.getState().equals(sqlState) || "28P01".equals(sqlState)) { | ||||
|                 LOGGER.debug("node " + hostSpec + " is active, and connenction authentication fails."); | ||||
|                 LOGGER.debug("remove before propSet size :" + propSet.size()); | ||||
|                 removeProperties(hostSpec, props); | ||||
|                 LOGGER.debug("remove after propSet size :" + propSet.size()); | ||||
|             } | ||||
|  | ||||
|             LOGGER.debug("acquire QueryExecutor failure " + e.getMessage()); | ||||
|         } catch (IOException e) { | ||||
|             LOGGER.debug("acquire QueryExecutor failure " + e.getMessage()); | ||||
|             LOGGER.debug(e.getCause()); | ||||
|         } | ||||
|         throw new SQLException(); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Check whether the node is the primary node | ||||
|      * | ||||
|      * @param queryExecutor queryExector | ||||
|      * @return true/false | ||||
|      */ | ||||
|     public boolean nodeRoleIsMaster(QueryExecutor queryExecutor) { | ||||
|         try { | ||||
|             return FACTORY.isMaster(queryExecutor); | ||||
|         } catch (SQLException | IOException e) { | ||||
|             LOGGER.debug("Error obtaining node role " + e.getMessage()); | ||||
|             LOGGER.debug(e.getStackTrace()); | ||||
|             return false; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Post-processing after the primary node fails | ||||
|      * | ||||
|      * @param hostSpec master ip and port | ||||
|      * @param slaves   slaves set | ||||
|      * @param props    the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public void cacheProcess(HostSpec hostSpec, Set<HostSpec> slaves, Set<Properties> props) { | ||||
|         HostSpec maseterNode = findMasterNode(slaves, props); | ||||
|         removeClusterNode(hostSpec, maseterNode, slaves); | ||||
|         if (maseterNode != null) { | ||||
|             addProperties(maseterNode, props); | ||||
|             ClusterHeartBeatFailureMaster.getInstance().addFailureMaster(hostSpec, maseterNode); | ||||
|         } else { | ||||
|             FailureCluster cluster = new FailureCluster(hostSpec, slaves, props); | ||||
|             ClusterHeartBeatFailureCluster.getInstance().addFailureCluster(cluster); | ||||
|         } | ||||
|         closeConnectionOfCrash(hostSpec.toString()); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Locate the host on the standby computer | ||||
|      * | ||||
|      * @param hostSpecSet slaves set | ||||
|      * @param properties  the parsed/defaulted connection properties | ||||
|      * @return new master node | ||||
|      */ | ||||
|     public HostSpec findMasterNode(Set<HostSpec> hostSpecSet, Set<Properties> properties) { | ||||
|         for (HostSpec hostSpec : hostSpecSet) { | ||||
|             List<QueryExecutor> queryExecutorList = getConnections(hostSpec.toString()); | ||||
|             for (QueryExecutor executor : queryExecutorList) { | ||||
|                 if (!executor.isClosed() && nodeRoleIsMaster(executor)) { | ||||
|                     return hostSpec; | ||||
|                 } | ||||
|             } | ||||
|             QueryExecutor queryExecutor = null; | ||||
|             try { | ||||
|                 queryExecutor = getQueryExecutor(hostSpec, properties); | ||||
|             } catch (SQLException e) { | ||||
|                 continue; | ||||
|             } | ||||
|             boolean isMaster = nodeRoleIsMaster(queryExecutor); | ||||
|             if (isMaster) { | ||||
|                 return hostSpec; | ||||
|             } | ||||
|         } | ||||
|         return null; | ||||
|     } | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,107 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.core.QueryExecutor; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.sql.SQLException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.Set; | ||||
|  | ||||
| /** | ||||
|  * After the primary node breaks down during heartbeat maintenance, | ||||
|  * the cluster is in the no-host state. This section describes how to detect hosts in a cluster without hosts. | ||||
|  */ | ||||
| public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{ | ||||
|  | ||||
|     public static List<FailureCluster> failureCluster = new ArrayList<>(); | ||||
|     private volatile static ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster; | ||||
|     private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureCluster.class.getName()); | ||||
|     private ClusterHeartBeatFailureCluster() { | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public static synchronized ClusterHeartBeatFailureCluster getInstance() { | ||||
|         if (clusterHeartBeatFailureCluster == null) { | ||||
|             clusterHeartBeatFailureCluster = new ClusterHeartBeatFailureCluster(); | ||||
|         } | ||||
|         return clusterHeartBeatFailureCluster; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * After the active node fails, find a new active node on the standby node or check whether the active node is resurrected | ||||
|      */ | ||||
|     public void run() { | ||||
|         if (failureCluster.isEmpty()) { | ||||
|             return; | ||||
|         } | ||||
|         List<FailureCluster> list = new ArrayList<>(failureCluster); | ||||
|         failureCluster.clear(); | ||||
|         LOGGER.debug("cluster does not have a master node" + list); | ||||
|         for (FailureCluster cluster : list) { | ||||
|             QueryExecutor queryExecutor = null; | ||||
|             try { | ||||
|                 if (cluster == null || cluster.getMaster() == null) { | ||||
|                     continue; | ||||
|                 } | ||||
|                 queryExecutor = getQueryExecutor(cluster.getMaster(), cluster.getProps()); | ||||
|             } catch (SQLException e) { | ||||
|                 Set<HostSpec> salves = cluster.getSalves(); | ||||
|                 int count = 0; | ||||
|                 for (HostSpec salf : salves) { | ||||
|                     try { | ||||
|                         getQueryExecutor(salf, cluster.getProps()); | ||||
|                     } catch (SQLException ex) { | ||||
|                         count++; | ||||
|                     } | ||||
|                 } | ||||
|                 if (count == salves.size()) { | ||||
|                     continue; | ||||
|                 } | ||||
|                 cacheProcess(cluster.getMaster(), salves, cluster.getProps()); | ||||
|             } | ||||
|             if (queryExecutor != null) { | ||||
|                 boolean isMaster = nodeRoleIsMaster(queryExecutor); | ||||
|                 if (isMaster) { | ||||
|                     addClusterNode(cluster.getMaster(), cluster.getSalves().toArray(new HostSpec[0])); | ||||
|                     addProperties(cluster.getMaster(), cluster.getProps()); | ||||
|                 } else { | ||||
|  | ||||
|                     HostSpec maseterNode = findMasterNode(cluster.getSalves(), cluster.getProps()); | ||||
|                     if (maseterNode != null) { | ||||
|                         addProperties(maseterNode, cluster.getProps()); | ||||
|                         Set<HostSpec> salves = cluster.getSalves(); | ||||
|                         salves.add(cluster.getMaster()); | ||||
|                         removeClusterNode(cluster.getMaster(), maseterNode, salves); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public void addFailureCluster(FailureCluster cluster) { | ||||
|         failureCluster.add(cluster); | ||||
|     } | ||||
|  | ||||
|     public List<FailureCluster> getFailureCluster() { | ||||
|         return failureCluster; | ||||
|     } | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,117 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.core.QueryExecutor; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.sql.SQLException; | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
|  | ||||
| /** | ||||
|  * If only one host in the cluster is down, and a new host is selected, | ||||
|  * the relationship between the faulty host and the new host needs to be maintained. | ||||
|  * When the faulty host recovers, it needs to be added to the node again for maintenance | ||||
|  */ | ||||
| public class ClusterHeartBeatFailureMaster extends ClusterHeartBeat{ | ||||
|  | ||||
|     public static Map<HostSpec, HostSpec> failureMap = new ConcurrentHashMap<>(); | ||||
|     private volatile static ClusterHeartBeatFailureMaster clusterHeartBeatFailureMaster; | ||||
|     private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureMaster.class.getName()); | ||||
|     private ClusterHeartBeatFailureMaster() { | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public static synchronized ClusterHeartBeatFailureMaster getInstance() { | ||||
|         if (clusterHeartBeatFailureMaster == null) { | ||||
|             clusterHeartBeatFailureMaster = new ClusterHeartBeatFailureMaster(); | ||||
|         } | ||||
|         return clusterHeartBeatFailureMaster; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * If the failed node is alive, join cache maintenance | ||||
|      */ | ||||
|     public void run() { | ||||
|         HashMap<HostSpec, HostSpec> failureMapClone = new HashMap<>(failureMap); | ||||
|         LOGGER.debug("failure node " + failureMapClone); | ||||
|         for (Map.Entry<HostSpec, HostSpec> next : failureMapClone.entrySet()) { | ||||
|             HostSpec key = next.getKey(); | ||||
|             HostSpec value = next.getValue(); | ||||
|             Set<Properties> properties = getProperties(key); | ||||
|             QueryExecutor queryExecutor = null; | ||||
|             try { | ||||
|                 queryExecutor = getQueryExecutor(key, properties); | ||||
|                 failureMap.remove(key); | ||||
|             } catch (SQLException e) { | ||||
|                 LOGGER.error(key.toString() + " tryConnect failure."); | ||||
|                 continue; | ||||
|             } | ||||
|             boolean isMaster = nodeRoleIsMaster(queryExecutor); | ||||
|             if (isMaster) { | ||||
|                 HostSpec current = value; | ||||
|                 while (failureMap.containsKey(current)) { | ||||
|                     current = failureMap.get(current); | ||||
|                 } | ||||
|                 if (getClusterRelationship().containsKey(current)) { | ||||
|                     Set<Properties> prop = getProperties(key); | ||||
|                     boolean currentIsMaster; | ||||
|                     try { | ||||
|                         QueryExecutor currentQueryExecutor = getQueryExecutor(current, prop); | ||||
|                         currentIsMaster = nodeRoleIsMaster(currentQueryExecutor); | ||||
|                     } catch (SQLException e) { | ||||
|                         currentIsMaster = false; | ||||
|                     } | ||||
|                     if (!currentIsMaster) { | ||||
|                         Set<HostSpec> set = getClusterSalveNode(current); | ||||
|                         set.add(current); | ||||
|                         addClusterNode(key, set.toArray(new HostSpec[0])); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|             } else { | ||||
|                 HostSpec current = value; | ||||
|                 while (failureMap.containsKey(current)) { | ||||
|                     if (current == failureMap.get(current)) { | ||||
|                         failureMap.remove(current); | ||||
|                         break; | ||||
|                     } | ||||
|                     current = failureMap.get(current); | ||||
|                 } | ||||
|                 addClusterNode(current, key); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public void addFailureMaster(HostSpec hostSpec, HostSpec maseterNode) { | ||||
|         failureMap.put(hostSpec, maseterNode); | ||||
|     } | ||||
|  | ||||
|     public Map<HostSpec, HostSpec> getFailureMaster() { | ||||
|         return failureMap; | ||||
|     } | ||||
|  | ||||
|     public void remove(HostSpec hostSpec) { | ||||
|         failureMap.remove(hostSpec); | ||||
|     } | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,131 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.core.QueryExecutor; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.sql.SQLException; | ||||
| import java.util.Arrays; | ||||
| import java.util.HashSet; | ||||
| import java.util.Iterator; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
|  | ||||
| import static org.postgresql.GlobalConnectionTracker.getConnections; | ||||
|  | ||||
| /** | ||||
|  * Maintain the primary node of the cluster. When the primary node breaks down, | ||||
|  * you need to detect a new host and bind the faulty host to the new host to facilitate | ||||
|  * quick switchover when the failed node is connected | ||||
|  */ | ||||
| public class ClusterHeartBeatMaster extends ClusterHeartBeat { | ||||
|  | ||||
|     public static final Map<HostSpec, Set<HostSpec>> CLUSTER_NODE_RELATIONSHIP = new ConcurrentHashMap<>(); | ||||
|     private volatile static ClusterHeartBeatMaster ClusterHeartBeatMaster; | ||||
|     private static Log LOGGER = Logger.getLogger(ClusterHeartBeatMaster.class.getName()); | ||||
|  | ||||
|     private ClusterHeartBeatMaster() { | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public static synchronized ClusterHeartBeatMaster getInstance() { | ||||
|         if (ClusterHeartBeatMaster == null) { | ||||
|             ClusterHeartBeatMaster = new ClusterHeartBeatMaster(); | ||||
|         } | ||||
|         return ClusterHeartBeatMaster; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * the primary node is active and added to the failure set after failure | ||||
|      */ | ||||
|     public void run() { | ||||
|         Iterator<Map.Entry<HostSpec, Set<HostSpec>>> iterator = CLUSTER_NODE_RELATIONSHIP.entrySet().iterator(); | ||||
|         LOGGER.debug("master nodes " + CLUSTER_NODE_RELATIONSHIP); | ||||
|         while (iterator.hasNext()) { | ||||
|             Map.Entry<HostSpec, Set<HostSpec>> nodeMap = iterator.next(); | ||||
|             HostSpec master = nodeMap.getKey(); | ||||
|             Set<HostSpec> slaves = nodeMap.getValue(); | ||||
|             LOGGER.debug("Current node " + master + " Standby node " + slaves); | ||||
|             QueryExecutor queryExecutor = null; | ||||
|             List<QueryExecutor> queryExecutorList = getConnections(master.toString()); | ||||
|             for (QueryExecutor executor : queryExecutorList) { | ||||
|                 if (!executor.isClosed()) { | ||||
|                     queryExecutor = executor; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             Set<Properties> propertiesSet = getProperties(master); | ||||
|             if (queryExecutor == null) { | ||||
|                 try { | ||||
|                     queryExecutor = super.getQueryExecutor(master, propertiesSet); | ||||
|                 } catch (SQLException e) { | ||||
|                     LOGGER.debug("acquire QueryExecutor failure"); | ||||
|                     super.cacheProcess(master, slaves, propertiesSet); | ||||
|                     continue; | ||||
|                 } | ||||
|             } | ||||
|             LOGGER.debug("Information about the current connected node " + queryExecutor.getSocketAddress()); | ||||
|             if (!super.nodeRoleIsMaster(queryExecutor)) { | ||||
|                 LOGGER.debug(master + ":The host is degraded to the standby server."); | ||||
|                 super.cacheProcess(master, slaves, propertiesSet); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public Map<HostSpec, Set<HostSpec>> getClusterRelationship() { | ||||
|         return CLUSTER_NODE_RELATIONSHIP; | ||||
|     } | ||||
|  | ||||
|     public void addClusterNode(HostSpec hostSpecs, HostSpec... value) { | ||||
|         synchronized (CLUSTER_NODE_RELATIONSHIP) { | ||||
|             Set<HostSpec> hostSpecSet = CLUSTER_NODE_RELATIONSHIP.computeIfAbsent(hostSpecs, k -> new HashSet<>()); | ||||
|             Arrays.stream(value) | ||||
|                     .filter(host -> !host.equals(hostSpecs)) | ||||
|                     .forEach(hostSpecSet::add); | ||||
|             CLUSTER_NODE_RELATIONSHIP.put(hostSpecs, hostSpecSet); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public void removeClusterNode(HostSpec key, HostSpec newKey, Set<HostSpec> slaves) { | ||||
|         synchronized (CLUSTER_NODE_RELATIONSHIP) { | ||||
|             CLUSTER_NODE_RELATIONSHIP.remove(key); | ||||
|             if (newKey != null) { | ||||
|                 Set<HostSpec> hostSpecSet = CLUSTER_NODE_RELATIONSHIP.get(newKey); | ||||
|                 if (hostSpecSet == null) { | ||||
|                     hostSpecSet = new HashSet<>(); | ||||
|                 } | ||||
|                 hostSpecSet.addAll(slaves); | ||||
|                 hostSpecSet.remove(newKey); | ||||
|                 CLUSTER_NODE_RELATIONSHIP.put(newKey, hostSpecSet); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public Set<HostSpec> get(HostSpec hostSpec) { | ||||
|         synchronized (CLUSTER_NODE_RELATIONSHIP) { | ||||
|             return CLUSTER_NODE_RELATIONSHIP.get(hostSpec); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,142 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.PGProperty; | ||||
| import org.postgresql.log.Log; | ||||
| import org.postgresql.log.Logger; | ||||
| import org.postgresql.util.GT; | ||||
| import org.postgresql.util.HostSpec; | ||||
| import org.postgresql.util.PSQLException; | ||||
| import org.postgresql.util.PSQLState; | ||||
|  | ||||
| import java.util.Arrays; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Executors; | ||||
| import java.util.stream.Collectors; | ||||
|  | ||||
| /** | ||||
|  *  Cluster information processing, cache connection information, whether to start the heartbeat thread | ||||
|  */ | ||||
| public class ClusterNodeCache { | ||||
|     private static volatile boolean status; | ||||
|     private static final String STATUS_LOCK = "status"; | ||||
|     private static Log LOGGER = Logger.getLogger(ClusterNodeCache.class.getName()); | ||||
|     private static final ExecutorService executorService = Executors.newCachedThreadPool(); | ||||
|     private static final ClusterHeartBeat CLUSTER_HEART_BEAT = new ClusterHeartBeat(); | ||||
|  | ||||
|     public static boolean isOpen() { | ||||
|         return status; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * The faulty host is switched to a new host | ||||
|      * @param hostSpecs the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public static void checkReplacement(HostSpec[] hostSpecs) { | ||||
|         if (hostSpecs == null) { | ||||
|             return; | ||||
|         } | ||||
|         ClusterHeartBeatFailureMaster failureMaster = ClusterHeartBeatFailureMaster.getInstance(); | ||||
|         Map<HostSpec, HostSpec> failureMap = failureMaster.getFailureMaster(); | ||||
|         for (int i = 0; i < hostSpecs.length; i++) { | ||||
|             while (failureMap.containsKey(hostSpecs[i])) { | ||||
|                 if (hostSpecs[i] == failureMap.get(hostSpecs[i])) { | ||||
|                     failureMaster.remove(hostSpecs[i]); | ||||
|                     return; | ||||
|                 } | ||||
|                 hostSpecs[i] = failureMap.get(hostSpecs[i]); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Verify parameters and replace the failed primary node | ||||
|      * @param hostSpecs cluster node | ||||
|      * @param properties the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public static void checkHostSpecs(HostSpec[] hostSpecs, Properties properties) throws PSQLException { | ||||
|         // check the interval of heartbeat threads. | ||||
|         Set<HostSpec> set = Arrays.stream(hostSpecs) | ||||
|                 .collect(Collectors.toSet()); | ||||
|         if (set.size() > 1) { | ||||
|             checkReplacement(hostSpecs); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * | ||||
|      * @param master master node | ||||
|      * @param hostSpecs cluster node | ||||
|      * @param properties the parsed/defaulted connection properties | ||||
|      */ | ||||
|     public static void pushHostSpecs(HostSpec master, HostSpec[] hostSpecs, Properties properties) { | ||||
|         Set<HostSpec> set = Arrays.stream(hostSpecs) | ||||
|                 .collect(Collectors.toSet()); | ||||
|         String period = PGProperty.HEARTBEAT_PERIOD.get(properties); | ||||
|         boolean open = true; | ||||
|         if (!isNumeric(period)) { | ||||
|             open = false; | ||||
|             LOGGER.info("Invalid heartbeatPeriod value: " + period); | ||||
|         } | ||||
|         long timePeriod = Long.parseLong(period); | ||||
|         if (timePeriod <= 0) { | ||||
|             open = false; | ||||
|             LOGGER.info("Invalid heartbeatPeriod value: " + period); | ||||
|         } | ||||
|         if (set.size() > 1 && open) { | ||||
|             CLUSTER_HEART_BEAT.addNodeRelationship(master, hostSpecs, properties); | ||||
|             start(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private static void start() { | ||||
|         if (status) { | ||||
|             LOGGER.info("heartbeat thread ----> started"); | ||||
|             return; | ||||
|         } | ||||
|         synchronized (STATUS_LOCK) { | ||||
|             if (status) { | ||||
|                 LOGGER.info("heartbeat thread ----> started"); | ||||
|                 return; | ||||
|             } | ||||
|             status = true; | ||||
|         } | ||||
|         executorService.execute(CLUSTER_HEART_BEAT::masterNodeProbe); | ||||
|     } | ||||
|  | ||||
|     public static void stop() { | ||||
|         status = false; | ||||
|     } | ||||
|  | ||||
|     private static boolean isNumeric(final CharSequence cs) { | ||||
|         if (cs.length() == 0) { | ||||
|             return false; | ||||
|         } | ||||
|         final int size = cs.length(); | ||||
|         for (int i = 0; i < size; i++) { | ||||
|             if (!Character.isDigit(cs.charAt(i))) { | ||||
|                 return false; | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,66 @@ | ||||
| /* | ||||
|  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. | ||||
|  * | ||||
|  * openGauss is licensed under Mulan PSL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PSL v2. | ||||
|  * You may obtain a copy of Mulan PSL v2 at: | ||||
|  * | ||||
|  *          http://license.coscl.org.cn/MulanPSL2 | ||||
|  * | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PSL v2 for more details. | ||||
|  */ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
|  | ||||
| /** | ||||
|  * Cluster information instance, including primary and secondary nodes and connection information | ||||
|  */ | ||||
| public class FailureCluster { | ||||
|     private HostSpec master; | ||||
|     private Set<HostSpec> salves; | ||||
|     private Set<Properties> props; | ||||
|  | ||||
|     /** | ||||
|      * | ||||
|      * @param master Current master node | ||||
|      * @param salves Slave set | ||||
|      * @param props Connection information | ||||
|      */ | ||||
|     public FailureCluster(HostSpec master, Set<HostSpec> salves, Set<Properties> props) { | ||||
|         this.master = master; | ||||
|         this.salves = salves; | ||||
|         this.props = props; | ||||
|     } | ||||
|  | ||||
|     public void setSalves(Set<HostSpec> salves) { | ||||
|         this.salves = salves; | ||||
|     } | ||||
|  | ||||
|     public void setProps(Set<Properties> props) { | ||||
|         this.props = props; | ||||
|     } | ||||
|  | ||||
|     public void setMaster(HostSpec master) { | ||||
|         this.master = master; | ||||
|     } | ||||
|  | ||||
|     public HostSpec getMaster() { | ||||
|         return master; | ||||
|     } | ||||
|  | ||||
|     public Set<HostSpec> getSalves() { | ||||
|         return salves; | ||||
|     } | ||||
|  | ||||
|     public Set<Properties> getProps() { | ||||
|         return props; | ||||
|     } | ||||
| } | ||||
| @ -9,6 +9,7 @@ package org.postgresql.core.v3; | ||||
| import org.postgresql.PGProperty; | ||||
| import org.postgresql.clusterchooser.ClusterStatus; | ||||
| import org.postgresql.clusterchooser.GlobalClusterStatusTracker; | ||||
| import org.postgresql.clusterhealthy.ClusterNodeCache; | ||||
| import org.postgresql.core.ConnectionFactory; | ||||
| import org.postgresql.core.PGStream; | ||||
| import org.postgresql.core.QueryExecutor; | ||||
| @ -111,7 +112,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private PGStream tryConnect(String user, String database, | ||||
|   public PGStream tryConnect(String user, String database, | ||||
|       Properties info, SocketFactory socketFactory, HostSpec hostSpec, | ||||
|       SslMode sslMode) | ||||
|       throws SQLException, IOException { | ||||
| @ -210,6 +211,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory { | ||||
|       ClusterSpec clusterSpec = clusterIter.next(); | ||||
|       HostSpec[] currentHostSpecs = clusterSpec.getHostSpecs(); | ||||
|  | ||||
|       if (currentHostSpecs.length > 1 && targetServerType == HostRequirement.master) { | ||||
|         ClusterNodeCache.checkHostSpecs(currentHostSpecs, info); | ||||
|       } | ||||
|       HostChooser hostChooser = | ||||
|               HostChooserFactory.createHostChooser(currentHostSpecs, targetServerType, info); | ||||
|       Iterator<CandidateHost> hostIter = hostChooser.iterator(); | ||||
| @ -317,6 +321,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory { | ||||
|           if (candidateHost.targetServerType != HostRequirement.any) { | ||||
|             hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Secondary; | ||||
|             LOGGER.info("Known status of host " + hostSpec + " is " + hostStatus); | ||||
|             if (hostStatus == HostStatus.Master) { | ||||
|               ClusterNodeCache.pushHostSpecs(hostSpec, currentHostSpecs, info); | ||||
|             } | ||||
|           } | ||||
|           GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus, info); | ||||
|           knownStates.put(hostSpec, hostStatus); | ||||
| @ -851,7 +858,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException { | ||||
|   public boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException { | ||||
|     String localRole = ""; | ||||
|     String dbState = ""; | ||||
|     List<byte[][]> results = SetupQueryRunner.runForList(queryExecutor, "select local_role, db_state from pg_stat_get_stream_replications();", true); | ||||
|  | ||||
| @ -0,0 +1,38 @@ | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.junit.Assert; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.postgresql.test.TestUtil; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Properties; | ||||
|  | ||||
| import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; | ||||
| import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getProperties; | ||||
|  | ||||
| public class ClusterHeartBeatFailureClusterTest { | ||||
|  | ||||
|     @Before | ||||
|     public void initDirver() throws Exception { | ||||
|         TestUtil.initDriver(); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void runTest() { | ||||
|         ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster = ClusterHeartBeatFailureCluster.getInstance(); | ||||
|         List<HostSpec> hostSpecs = getHostSpecs(); | ||||
|         HostSpec master = hostSpecs.get(0); | ||||
|         hostSpecs.remove(master); | ||||
|         HashSet<Properties> set = new HashSet<>(); | ||||
|         set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); | ||||
|         FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set); | ||||
|         clusterHeartBeatFailureCluster.addFailureCluster(failureCluster); | ||||
|         System.out.println(clusterHeartBeatFailureCluster.getFailureCluster()); | ||||
|         clusterHeartBeatFailureCluster.run(); | ||||
|         System.out.println(clusterHeartBeatFailureCluster.getFailureCluster()); | ||||
|         Assert.assertTrue(clusterHeartBeatFailureCluster.getFailureCluster().size() == 0); | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,35 @@ | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.junit.Assert; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.postgresql.test.TestUtil; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Properties; | ||||
|  | ||||
| import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; | ||||
|  | ||||
| public class ClusterHeartBeatFailureMasterTest { | ||||
|     @Before | ||||
|     public void initDirver() throws Exception { | ||||
|         TestUtil.initDriver(); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void runTest() { | ||||
|         ClusterHeartBeatFailureMaster instance = ClusterHeartBeatFailureMaster.getInstance(); | ||||
|         List<HostSpec> hostSpecs = getHostSpecs(); | ||||
|         instance.addFailureMaster(hostSpecs.get(0), hostSpecs.get(1)); | ||||
|         HashSet<Properties> set = new HashSet<>(); | ||||
|         set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); | ||||
|         instance.addProperties(hostSpecs.get(0), set); | ||||
|         instance.addProperties(hostSpecs.get(1), set); | ||||
|         Assert.assertTrue(instance.getFailureMaster().size() != 0); | ||||
|         instance.run(); | ||||
|         System.out.println(instance.getFailureMaster()); | ||||
|         Assert.assertTrue(instance.getFailureMaster().size() == 0); | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,40 @@ | ||||
|  | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
|  | ||||
| import org.junit.Assert; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.postgresql.test.TestUtil; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
|  | ||||
| import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; | ||||
|  | ||||
| public class ClusterHeartBeatMasterTest { | ||||
|     @Before | ||||
|     public void initDirver() throws Exception { | ||||
|         TestUtil.initDriver(); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void runTest() { | ||||
|         ClusterHeartBeatMaster clusterHeartBeatMaster = ClusterHeartBeatMaster.getInstance(); | ||||
|         Map<HostSpec, Set<HostSpec>> clusterRelationship = clusterHeartBeatMaster.getClusterRelationship(); | ||||
|         List<HostSpec> hostSpecs = getHostSpecs(); | ||||
|         HostSpec master = hostSpecs.get(0); | ||||
|         hostSpecs.remove(master); | ||||
|         clusterRelationship.put(master, new HashSet<>(hostSpecs)); | ||||
|         HashSet<Properties> set = new HashSet<>(); | ||||
|         set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); | ||||
|         clusterHeartBeatMaster.addProperties(master, set); | ||||
|         clusterHeartBeatMaster.run(); | ||||
|         Assert.assertTrue(clusterRelationship.containsKey(master)); | ||||
|     } | ||||
|  | ||||
| } | ||||
| @ -0,0 +1,66 @@ | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.Properties; | ||||
|  | ||||
| import static java.util.stream.Collectors.joining; | ||||
|  | ||||
| public class ClusterHeartBeatUtil { | ||||
|     public static List<HostSpec> getHostSpecs() { | ||||
|         List<HostSpec> hostSpecList = new ArrayList<>(); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("server"), | ||||
|                 Integer.parseInt(System.getProperty("port")))); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"), | ||||
|                 Integer.parseInt(System.getProperty("secondaryPort")))); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"), | ||||
|                 Integer.parseInt(System.getProperty("secondaryServerPort2")))); | ||||
|         return hostSpecList; | ||||
|     } | ||||
|     public static String getUrl() { | ||||
|         List<String> list = new ArrayList<>(); | ||||
|         list.add(System.getProperty("server") + ":" + System.getProperty("port")); | ||||
|         list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort")); | ||||
|         list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2")); | ||||
|         String serverAndPort =  list.stream() | ||||
|                 .collect(joining(",")); | ||||
|         String database = getDatabase(); | ||||
|         return String.format("jdbc:postgresql://%s/%s", serverAndPort, database); | ||||
|     } | ||||
|  | ||||
|     public static String getDatabase() { | ||||
|         return System.getProperty("database"); | ||||
|     } | ||||
|  | ||||
|     public static String getUsername () { | ||||
|         return System.getProperty("username"); | ||||
|     } | ||||
|  | ||||
|     public static String getPassword() { | ||||
|         return System.getProperty("password"); | ||||
|     } | ||||
|  | ||||
|     public static Properties getProperties(List<HostSpec> hostSpecs) { | ||||
|  | ||||
|         Properties properties = new Properties(); | ||||
|         properties.put("user", getUsername()); | ||||
|         properties.put("password", getPassword()); | ||||
|         Properties info = new Properties(properties); | ||||
|         info.put("PGDBNAME", getDatabase()); | ||||
|         info.put("PGPORT", hostSpecs.stream() | ||||
|                 .map(o -> String.valueOf(o.getPort())) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGHOSTURL", hostSpecs.stream() | ||||
|                 .map(HostSpec::getHost) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGHOST", hostSpecs.stream() | ||||
|                 .map(HostSpec::getHost) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGPORTURL", hostSpecs.stream() | ||||
|                 .map(o -> String.valueOf(o.getPort())) | ||||
|                 .collect(joining(","))); | ||||
|         return info; | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,155 @@ | ||||
| package org.postgresql.clusterhealthy; | ||||
|  | ||||
| import org.junit.Assert; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.postgresql.test.TestUtil; | ||||
| import org.postgresql.util.HostSpec; | ||||
|  | ||||
| import java.sql.DriverManager; | ||||
| import java.sql.SQLException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.Set; | ||||
|  | ||||
| import static java.util.stream.Collectors.joining; | ||||
| import static org.postgresql.clusterhealthy.ClusterNodeCache.checkHostSpecs; | ||||
|  | ||||
| public class ClusterNodeCacheTest { | ||||
|  | ||||
|     private List<HostSpec> getHostSpecs() { | ||||
|         List<HostSpec> hostSpecList = new ArrayList<>(); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("server"), | ||||
|                 Integer.parseInt(System.getProperty("port")))); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"), | ||||
|                 Integer.parseInt(System.getProperty("secondaryPort")))); | ||||
|         hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"), | ||||
|                 Integer.parseInt(System.getProperty("secondaryServerPort2")))); | ||||
|         return hostSpecList; | ||||
|     } | ||||
|     private static String getUrl() { | ||||
|         List<String> list = new ArrayList<>(); | ||||
|         list.add(System.getProperty("server") + ":" + System.getProperty("port")); | ||||
|         list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort")); | ||||
|         list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2")); | ||||
|         String serverAndPort =  list.stream() | ||||
|                 .collect(joining(",")); | ||||
|         String database = getDatabase(); | ||||
|         return String.format("jdbc:postgresql://%s/%s", serverAndPort, database); | ||||
|     } | ||||
|  | ||||
|     private static String getDatabase() { | ||||
|         return System.getProperty("database"); | ||||
|     } | ||||
|  | ||||
|     private static String getUsername () { | ||||
|         return System.getProperty("username"); | ||||
|     } | ||||
|  | ||||
|     private static String getPassword() { | ||||
|         return System.getProperty("password"); | ||||
|     } | ||||
|  | ||||
|     private static Properties getProperties(List<HostSpec> hostSpecs) { | ||||
|  | ||||
|         Properties properties = new Properties(); | ||||
|         properties.put("user", getUsername()); | ||||
|         properties.put("password", getPassword()); | ||||
|         Properties info = new Properties(properties); | ||||
|         info.put("PGDBNAME", getDatabase()); | ||||
|         info.put("PGPORT", hostSpecs.stream() | ||||
|                 .map(o -> String.valueOf(o.getPort())) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGHOSTURL", hostSpecs.stream() | ||||
|                 .map(HostSpec::getHost) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGHOST", hostSpecs.stream() | ||||
|                 .map(HostSpec::getHost) | ||||
|                 .collect(joining(","))); | ||||
|         info.put("PGPORTURL", hostSpecs.stream() | ||||
|                 .map(o -> String.valueOf(o.getPort())) | ||||
|                 .collect(joining(","))); | ||||
|         return info; | ||||
|     } | ||||
|  | ||||
|     @Before | ||||
|     public void initDirver() throws Exception { | ||||
|         TestUtil.initDriver(); | ||||
|     } | ||||
|     @Test | ||||
|     public void getClusterConnection() throws SQLException { | ||||
| //        Class.forName("org.postgresql.Driver"); | ||||
|  | ||||
|         DriverManager.getConnection(getUrl(), getUsername(), getPassword()); | ||||
|  | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void testPeriodTime() throws Exception { | ||||
|         String url = getUrl() + "?targetServerType=master"; | ||||
|         DriverManager.getConnection(getUrl(), getUsername(), getPassword()); | ||||
|         String newUrl = url + "&heartbeatPeriod=3000"; | ||||
|         DriverManager.getConnection(newUrl, getUsername(), getPassword()); | ||||
|         Thread.sleep(10000L); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void testUnavailablePassword() throws ClassNotFoundException, SQLException, InterruptedException { | ||||
| //        Class.forName("org.postgresql.Driver"); | ||||
|         String url = getUrl() + "?targetServerType=master"; | ||||
|         DriverManager.getConnection(url, getUsername(), getPassword()); | ||||
|         HostSpec master = new HostSpec(System.getProperty("server"), | ||||
|                 Integer.parseInt(System.getProperty("port"))); | ||||
|         ClusterHeartBeatMaster instance = ClusterHeartBeatMaster.getInstance(); | ||||
|         Set<Properties> properties = instance.getProperties(master); | ||||
|         int before = properties.size(); | ||||
|         // Change password | ||||
|         Thread.sleep(10000L); | ||||
|  | ||||
|         Set<Properties> afterProperties = instance.getProperties(master); | ||||
|         int after = afterProperties.size(); | ||||
|  | ||||
|         Assert.assertNotEquals(after, before); | ||||
|  | ||||
|         Thread.sleep(10000L); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void testCheckReplacement() { | ||||
|         HostSpec master = new HostSpec(System.getProperty("server"), | ||||
|                 Integer.parseInt(System.getProperty("port"))); | ||||
|         Map<HostSpec, HostSpec> failureMap = ClusterHeartBeatFailureMaster.failureMap; | ||||
|         HostSpec node = new HostSpec("10.10.0.1", 2525); | ||||
|         failureMap.put(master, node); | ||||
|         List<HostSpec> hostSpecList = getHostSpecs(); | ||||
|         HostSpec[] hostSpecs = hostSpecList.toArray(new HostSpec[0]); | ||||
|         ClusterNodeCache.checkReplacement(hostSpecs); | ||||
|         boolean containsMaster = false; | ||||
|         boolean containsNode = false; | ||||
|         for (HostSpec hostSpec : hostSpecs) { | ||||
|             if (hostSpec.equals(master)) { | ||||
|                 containsMaster = true; | ||||
|             } | ||||
|             if (hostSpec.equals(node)) { | ||||
|                 containsNode = true; | ||||
|             } | ||||
|         } | ||||
|         Assert.assertTrue(containsNode); | ||||
|         Assert.assertFalse(containsMaster); | ||||
|     } | ||||
|  | ||||
|     @Test | ||||
|     public void testpushHostSpecs() throws Exception { | ||||
|  | ||||
|         List<HostSpec> hostSpecs = getHostSpecs(); | ||||
|         Properties info = getProperties(hostSpecs); | ||||
|         checkHostSpecs(hostSpecs.toArray(new HostSpec[0]), info); | ||||
|         System.out.println(ClusterNodeCache.isOpen()); | ||||
|         Thread.sleep(30000); | ||||
|     } | ||||
|  | ||||
|  | ||||
|  | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 chen-czywj
					chen-czywj