diff --git a/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java b/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java index 41949db..07252bd 100644 --- a/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java +++ b/pgjdbc/src/main/java/org/postgresql/GlobalConnectionTracker.java @@ -5,7 +5,9 @@ import org.postgresql.jdbc.PgConnection; import org.postgresql.log.Log; import org.postgresql.log.Logger; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -26,6 +28,16 @@ public class GlobalConnectionTracker { return PGProperty.FORCE_TARGET_SERVER_SLAVE.getBoolean(props) && ("slave".equals(PGProperty.TARGET_SERVER_TYPE.get(props)) || "secondary".equals(PGProperty.TARGET_SERVER_TYPE.get(props))); } + + /** + * + * @param props the parsed/defaulted connection properties + * @return + */ + private static boolean isTargetServerMaster(Properties props) { + return "master".equals(PGProperty.TARGET_SERVER_TYPE.get(props)); + } + /** * Store the actual query executor and connection host spec. * @@ -33,7 +45,9 @@ public class GlobalConnectionTracker { * @param queryExecutor */ public static void possessConnectionReference(QueryExecutor queryExecutor, Properties props) { - if(!isForceTargetServerSlave(props)) return; + if (!isForceTargetServerSlave(props) || !isTargetServerMaster(props)) { + return; + } int identityQueryExecute = System.identityHashCode(queryExecutor); String hostSpec = queryExecutor.getHostSpec().toString(); synchronized (connectionManager) { @@ -53,7 +67,9 @@ public class GlobalConnectionTracker { * @param queryExecutor */ public static void releaseConnectionReference(QueryExecutor queryExecutor, Properties props) { - if(!isForceTargetServerSlave(props)) return; + if (!isForceTargetServerSlave(props)) { + return; + } String hostSpec = queryExecutor.getHostSpec().toString(); int identityQueryExecute = System.identityHashCode(queryExecutor); synchronized (connectionManager) { @@ -93,4 +109,46 @@ public class GlobalConnectionTracker { } } + /** + * Close all existing connections under the specified host. + * + * @param hostSpec ip and port. + */ + public static void closeConnectionOfCrash(String hostSpec) { + synchronized (connectionManager) { + HashMap hostConnection = connectionManager.getOrDefault(hostSpec, null); + if (hostConnection != null) { + LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, start to close the original connection."); + for (QueryExecutor queryExecutor : hostConnection.values()) { + if (queryExecutor != null && !queryExecutor.isClosed()) { + queryExecutor.close(); + queryExecutor.setAvailability(false); + } + } + hostConnection.clear(); + LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, end to close the original connection."); + } + } + } + + /** + * get all existing connections under the specified host. + * + * @param hostSpec ip and port. + */ + public static List getConnections(String hostSpec) { + synchronized (connectionManager) { + List ret = new ArrayList<>(); + HashMap hostConnection = connectionManager.getOrDefault(hostSpec, null); + if (hostConnection != null) { + for (Map.Entry queryExecutorEntry : hostConnection.entrySet()) { + ret.add(queryExecutorEntry.getValue()); + } + } + return ret; + } + } + + + } \ No newline at end of file diff --git a/pgjdbc/src/main/java/org/postgresql/PGProperty.java b/pgjdbc/src/main/java/org/postgresql/PGProperty.java index dced34c..4675e42 100644 --- a/pgjdbc/src/main/java/org/postgresql/PGProperty.java +++ b/pgjdbc/src/main/java/org/postgresql/PGProperty.java @@ -198,12 +198,12 @@ public enum PGProperty { */ SSL_MODE("sslmode", null, "Parameter governing the use of SSL", false, "disable", "allow", "prefer", "require", "verify-ca", "verify-full"), - + /** * Context of SSL(SSLContext.getInstance("@code")): empty for TLS,valid values{SSL/SSLv2/SSLv3/TLS/TLSv1/TLSv1.1/TLSv1.2} */ SSL_CONTEXT("sslcontext", null, "Control use of SSL Context(SSL, TLS, TLSv1.2, etc)"), - + /** * Classname of the SSL Factory to use (instance of {@code javax.net.ssl.SSLSocketFactory}). */ @@ -252,7 +252,7 @@ public enum PGProperty { SSL_PRIVATEKEY_FACTORY("sslprivatekeyfactory", null, "The privatekey factory for the client's ssl"), - + /** * The classname instantiating {@code javax.security.auth.callback.CallbackHandler} to use. */ @@ -287,7 +287,7 @@ public enum PGProperty { * value of zero means that it is disabled. */ SOCKET_TIMEOUT("socketTimeout", "0", "The timeout value used for socket read operations."), - + /** * The timeout value used for socket read operations when jdbc connecting. If reading from the server takes longer than * this value, the connection is closed. This can be used as both a brute force global query @@ -339,7 +339,7 @@ public enum PGProperty { * The application name (require server version >= 9.0). */ APPLICATION_NAME("ApplicationName", DriverInfo.DRIVER_NAME, "Name of the Application (backend >= 9.0)"), - + APPLICATION_TYPE("ApplicationType", null, "Application Type"), /** @@ -532,6 +532,11 @@ public enum PGProperty { "", "Factory class to instantiate factories for XML processing"), + /** + * It is used to detect the thread interval of the survival task on the primary node in the high availability scenario. + */ + HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time") + ; private String _name; diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java new file mode 100644 index 0000000..5b9c719 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeat.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.PGProperty; +import org.postgresql.core.PGStream; +import org.postgresql.core.QueryExecutor; +import org.postgresql.core.SocketFactoryFactory; +import org.postgresql.core.v3.ConnectionFactoryImpl; +import org.postgresql.core.v3.QueryExecutorImpl; +import org.postgresql.jdbc.SslMode; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.HostSpec; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.postgresql.GlobalConnectionTracker.closeConnectionOfCrash; +import static org.postgresql.GlobalConnectionTracker.getConnections; +import static org.postgresql.clusterhealthy.ClusterNodeCache.isOpen; +import static org.postgresql.util.PSQLState.CONNECTION_REJECTED; + +/** + * Heartbeat detection, active detection of the primary node, and maintenance of cache after failure The next + * contest pair active detection of the failed node, detection of the cluster where the primary node has failed, + * and maintenance of the latest information to the cache set。 + */ +public class ClusterHeartBeat { + + public static final Map> CLUSTER_PROPERTIES = new ConcurrentHashMap<>(); + private static Log LOGGER = Logger.getLogger(ClusterHeartBeat.class.getName()); + private static final ConnectionFactoryImpl FACTORY = new ConnectionFactoryImpl(); + private static final String UPDATE_TIME = "time"; + private volatile Long periodTime = 5000L; + + + /** + * heartbeat task thread. + */ + public void masterNodeProbe() { + while (isOpen()) { + LOGGER.debug("heartBeat thread start time: " + new Date(System.currentTimeMillis())); + // failed node detection + ClusterHeartBeatFailureMaster.getInstance().run(); + + // active primary node detection + ClusterHeartBeatMaster.getInstance().run(); + // The failed cluster seeks the primary node + ClusterHeartBeatFailureCluster.getInstance().run(); + try { + Thread.sleep(periodTime); + } catch (InterruptedException e) { + LOGGER.debug(e.getStackTrace()); + } + } + periodTime = 5000L; + } + + /** + * get parsed/defaulted connection properties + * @param hostSpec ip and port + * @return properties set + */ + public Set getProperties(HostSpec hostSpec) { + synchronized (CLUSTER_PROPERTIES) { + return CLUSTER_PROPERTIES.computeIfAbsent(hostSpec, k -> new HashSet<>()); + } + } + + public Map> getClusterRelationship () { + return ClusterHeartBeatMaster.getInstance().getClusterRelationship(); + } + + /** + * Adding Cluster Information + * @param key master + * @param value secondary list + * @param properties the parsed/defaulted connection properties + */ + public void addNodeRelationship (HostSpec key, HostSpec[] value, Properties properties) { + // update node host and ip + addClusterNode(key, value); + // update node properties + addProperties(key, Collections.singleton(properties)); + if (PGProperty.HEARTBEAT_PERIOD.get(properties) != null) { + String period = PGProperty.HEARTBEAT_PERIOD.get(properties); + long time = Long.parseLong(period); + synchronized (UPDATE_TIME) { + if (time > 0) { + periodTime = Math.min(periodTime, time); + } + } + } + } + + /** + * + * @param hostSpec ip and port + * @param properties the parsed/defaulted connection properties + */ + public void addProperties(HostSpec hostSpec, Set properties) { + synchronized (CLUSTER_PROPERTIES) { + Set propertiesSet = CLUSTER_PROPERTIES.get(hostSpec); + if (propertiesSet == null) { + propertiesSet = new HashSet<>(); + } + propertiesSet.addAll(properties); + CLUSTER_PROPERTIES.put(hostSpec, propertiesSet); + } + } + + /** + * Update the cluster node relationship + * @param key old master + * @param newKey new master + * @param slaves slaves set + */ + public void removeClusterNode(HostSpec key, HostSpec newKey, Set slaves) { + ClusterHeartBeatMaster.getInstance().removeClusterNode(key, newKey, slaves); + + } + + /** + * Perform cache maintenance on cluster nodes connected to hosts + * @param hostSpecs the parsed/defaulted connection properties + * @param value slaves set + */ + public void addClusterNode(HostSpec hostSpecs, HostSpec... value) { + ClusterHeartBeatMaster.getInstance().addClusterNode(hostSpecs, value); + } + + /** + * Get the cluster slave node + * @param hostSpec the parsed/defaulted connection properties + * @return slaves set + */ + public Set getClusterSalveNode(HostSpec hostSpec) { + return ClusterHeartBeatMaster.getInstance().getClusterSalveNode(hostSpec); + } + + /** + * + * @param hostSpec ip and port + * @param properties the parsed/defaulted connection properties + */ + public void removeProperties(HostSpec hostSpec, Properties properties) { + synchronized (CLUSTER_PROPERTIES) { + Set propertiesSet = CLUSTER_PROPERTIES.getOrDefault(hostSpec, null); + if (propertiesSet != null) { + propertiesSet.remove(properties); + CLUSTER_PROPERTIES.put(hostSpec, propertiesSet); + } + } + } + + /** + * the node probes the activity by reflecting the tryConnect() method. + * + * @param hostSpec ip and port. + * @param propSet the parsed/defaulted connection properties + * @return QueryExecutor + * @throws SQLException new sql exception + */ + public QueryExecutor getQueryExecutor(HostSpec hostSpec, Set propSet) throws SQLException { + Properties props = null; + try { + for (Properties properties : propSet) { + props = properties; + SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(props); + SslMode sslMode = SslMode.of(props); + String user = props.getProperty("user", ""); + String database = props.getProperty("PGDBNAME", ""); + PGStream pgStream = FACTORY.tryConnect(user, database, props, socketFactory, hostSpec, sslMode); + return new QueryExecutorImpl(pgStream, user, database, + 1000, props); + } + } catch (SQLException e) { + String sqlState = e.getSQLState(); + if (CONNECTION_REJECTED.getState().equals(sqlState) || "28P01".equals(sqlState)) { + LOGGER.debug("node " + hostSpec + " is active, and connenction authentication fails."); + LOGGER.debug("remove before propSet size :" + propSet.size()); + removeProperties(hostSpec, props); + LOGGER.debug("remove after propSet size :" + propSet.size()); + } + + LOGGER.debug("acquire QueryExecutor failure " + e.getMessage()); + } catch (IOException e) { + LOGGER.debug("acquire QueryExecutor failure " + e.getMessage()); + LOGGER.debug(e.getCause()); + } + throw new SQLException(); + } + + /** + * Check whether the node is the primary node + * + * @param queryExecutor queryExector + * @return true/false + */ + public boolean nodeRoleIsMaster(QueryExecutor queryExecutor) { + try { + return FACTORY.isMaster(queryExecutor); + } catch (SQLException | IOException e) { + LOGGER.debug("Error obtaining node role " + e.getMessage()); + LOGGER.debug(e.getStackTrace()); + return false; + } + } + + /** + * Post-processing after the primary node fails + * + * @param hostSpec master ip and port + * @param slaves slaves set + * @param props the parsed/defaulted connection properties + */ + public void cacheProcess(HostSpec hostSpec, Set slaves, Set props) { + HostSpec maseterNode = findMasterNode(slaves, props); + removeClusterNode(hostSpec, maseterNode, slaves); + if (maseterNode != null) { + addProperties(maseterNode, props); + ClusterHeartBeatFailureMaster.getInstance().addFailureMaster(hostSpec, maseterNode); + } else { + FailureCluster cluster = new FailureCluster(hostSpec, slaves, props); + ClusterHeartBeatFailureCluster.getInstance().addFailureCluster(cluster); + } + closeConnectionOfCrash(hostSpec.toString()); + } + + /** + * Locate the host on the standby computer + * + * @param hostSpecSet slaves set + * @param properties the parsed/defaulted connection properties + * @return new master node + */ + public HostSpec findMasterNode(Set hostSpecSet, Set properties) { + for (HostSpec hostSpec : hostSpecSet) { + List queryExecutorList = getConnections(hostSpec.toString()); + for (QueryExecutor executor : queryExecutorList) { + if (!executor.isClosed() && nodeRoleIsMaster(executor)) { + return hostSpec; + } + } + QueryExecutor queryExecutor = null; + try { + queryExecutor = getQueryExecutor(hostSpec, properties); + } catch (SQLException e) { + continue; + } + boolean isMaster = nodeRoleIsMaster(queryExecutor); + if (isMaster) { + return hostSpec; + } + } + return null; + } + +} diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java new file mode 100644 index 0000000..ba9ece7 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureCluster.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.core.QueryExecutor; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.HostSpec; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * After the primary node breaks down during heartbeat maintenance, + * the cluster is in the no-host state. This section describes how to detect hosts in a cluster without hosts. + */ +public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{ + + public static List failureCluster = new ArrayList<>(); + private volatile static ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster; + private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureCluster.class.getName()); + private ClusterHeartBeatFailureCluster() { + + } + + public static synchronized ClusterHeartBeatFailureCluster getInstance() { + if (clusterHeartBeatFailureCluster == null) { + clusterHeartBeatFailureCluster = new ClusterHeartBeatFailureCluster(); + } + return clusterHeartBeatFailureCluster; + } + + /** + * After the active node fails, find a new active node on the standby node or check whether the active node is resurrected + */ + public void run() { + if (failureCluster.isEmpty()) { + return; + } + List list = new ArrayList<>(failureCluster); + failureCluster.clear(); + LOGGER.debug("cluster does not have a master node" + list); + for (FailureCluster cluster : list) { + QueryExecutor queryExecutor = null; + try { + if (cluster == null || cluster.getMaster() == null) { + continue; + } + queryExecutor = getQueryExecutor(cluster.getMaster(), cluster.getProps()); + } catch (SQLException e) { + Set salves = cluster.getSalves(); + int count = 0; + for (HostSpec salf : salves) { + try { + getQueryExecutor(salf, cluster.getProps()); + } catch (SQLException ex) { + count++; + } + } + if (count == salves.size()) { + continue; + } + cacheProcess(cluster.getMaster(), salves, cluster.getProps()); + } + if (queryExecutor != null) { + boolean isMaster = nodeRoleIsMaster(queryExecutor); + if (isMaster) { + addClusterNode(cluster.getMaster(), cluster.getSalves().toArray(new HostSpec[0])); + addProperties(cluster.getMaster(), cluster.getProps()); + } else { + + HostSpec maseterNode = findMasterNode(cluster.getSalves(), cluster.getProps()); + if (maseterNode != null) { + addProperties(maseterNode, cluster.getProps()); + Set salves = cluster.getSalves(); + salves.add(cluster.getMaster()); + removeClusterNode(cluster.getMaster(), maseterNode, salves); + } + } + } + } + } + + public void addFailureCluster(FailureCluster cluster) { + failureCluster.add(cluster); + } + + public List getFailureCluster() { + return failureCluster; + } + +} diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMaster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMaster.java new file mode 100644 index 0000000..2c94d8a --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMaster.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.core.QueryExecutor; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.HostSpec; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * If only one host in the cluster is down, and a new host is selected, + * the relationship between the faulty host and the new host needs to be maintained. + * When the faulty host recovers, it needs to be added to the node again for maintenance + */ +public class ClusterHeartBeatFailureMaster extends ClusterHeartBeat{ + + public static Map failureMap = new ConcurrentHashMap<>(); + private volatile static ClusterHeartBeatFailureMaster clusterHeartBeatFailureMaster; + private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureMaster.class.getName()); + private ClusterHeartBeatFailureMaster() { + + } + + public static synchronized ClusterHeartBeatFailureMaster getInstance() { + if (clusterHeartBeatFailureMaster == null) { + clusterHeartBeatFailureMaster = new ClusterHeartBeatFailureMaster(); + } + return clusterHeartBeatFailureMaster; + } + + /** + * If the failed node is alive, join cache maintenance + */ + public void run() { + HashMap failureMapClone = new HashMap<>(failureMap); + LOGGER.debug("failure node " + failureMapClone); + for (Map.Entry next : failureMapClone.entrySet()) { + HostSpec key = next.getKey(); + HostSpec value = next.getValue(); + Set properties = getProperties(key); + QueryExecutor queryExecutor = null; + try { + queryExecutor = getQueryExecutor(key, properties); + failureMap.remove(key); + } catch (SQLException e) { + LOGGER.error(key.toString() + " tryConnect failure."); + continue; + } + boolean isMaster = nodeRoleIsMaster(queryExecutor); + if (isMaster) { + HostSpec current = value; + while (failureMap.containsKey(current)) { + current = failureMap.get(current); + } + if (getClusterRelationship().containsKey(current)) { + Set prop = getProperties(key); + boolean currentIsMaster; + try { + QueryExecutor currentQueryExecutor = getQueryExecutor(current, prop); + currentIsMaster = nodeRoleIsMaster(currentQueryExecutor); + } catch (SQLException e) { + currentIsMaster = false; + } + if (!currentIsMaster) { + Set set = getClusterSalveNode(current); + set.add(current); + addClusterNode(key, set.toArray(new HostSpec[0])); + } + } + + } else { + HostSpec current = value; + while (failureMap.containsKey(current)) { + if (current == failureMap.get(current)) { + failureMap.remove(current); + break; + } + current = failureMap.get(current); + } + addClusterNode(current, key); + } + } + } + + public void addFailureMaster(HostSpec hostSpec, HostSpec maseterNode) { + failureMap.put(hostSpec, maseterNode); + } + + public Map getFailureMaster() { + return failureMap; + } + + public void remove(HostSpec hostSpec) { + failureMap.remove(hostSpec); + } + +} diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java new file mode 100644 index 0000000..5b8717c --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterHeartBeatMaster.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.core.QueryExecutor; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.HostSpec; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.postgresql.GlobalConnectionTracker.getConnections; + +/** + * Maintain the primary node of the cluster. When the primary node breaks down, + * you need to detect a new host and bind the faulty host to the new host to facilitate + * quick switchover when the failed node is connected + */ +public class ClusterHeartBeatMaster extends ClusterHeartBeat { + + public static final Map> CLUSTER_NODE_RELATIONSHIP = new ConcurrentHashMap<>(); + private volatile static ClusterHeartBeatMaster ClusterHeartBeatMaster; + private static Log LOGGER = Logger.getLogger(ClusterHeartBeatMaster.class.getName()); + + private ClusterHeartBeatMaster() { + + } + + public static synchronized ClusterHeartBeatMaster getInstance() { + if (ClusterHeartBeatMaster == null) { + ClusterHeartBeatMaster = new ClusterHeartBeatMaster(); + } + return ClusterHeartBeatMaster; + } + + /** + * the primary node is active and added to the failure set after failure + */ + public void run() { + Iterator>> iterator = CLUSTER_NODE_RELATIONSHIP.entrySet().iterator(); + LOGGER.debug("master nodes " + CLUSTER_NODE_RELATIONSHIP); + while (iterator.hasNext()) { + Map.Entry> nodeMap = iterator.next(); + HostSpec master = nodeMap.getKey(); + Set slaves = nodeMap.getValue(); + LOGGER.debug("Current node " + master + " Standby node " + slaves); + QueryExecutor queryExecutor = null; + List queryExecutorList = getConnections(master.toString()); + for (QueryExecutor executor : queryExecutorList) { + if (!executor.isClosed()) { + queryExecutor = executor; + break; + } + } + + Set propertiesSet = getProperties(master); + if (queryExecutor == null) { + try { + queryExecutor = super.getQueryExecutor(master, propertiesSet); + } catch (SQLException e) { + LOGGER.debug("acquire QueryExecutor failure"); + super.cacheProcess(master, slaves, propertiesSet); + continue; + } + } + LOGGER.debug("Information about the current connected node " + queryExecutor.getSocketAddress()); + if (!super.nodeRoleIsMaster(queryExecutor)) { + LOGGER.debug(master + ":The host is degraded to the standby server."); + super.cacheProcess(master, slaves, propertiesSet); + } + } + } + + public Map> getClusterRelationship() { + return CLUSTER_NODE_RELATIONSHIP; + } + + public void addClusterNode(HostSpec hostSpecs, HostSpec... value) { + synchronized (CLUSTER_NODE_RELATIONSHIP) { + Set hostSpecSet = CLUSTER_NODE_RELATIONSHIP.computeIfAbsent(hostSpecs, k -> new HashSet<>()); + Arrays.stream(value) + .filter(host -> !host.equals(hostSpecs)) + .forEach(hostSpecSet::add); + CLUSTER_NODE_RELATIONSHIP.put(hostSpecs, hostSpecSet); + } + } + + public void removeClusterNode(HostSpec key, HostSpec newKey, Set slaves) { + synchronized (CLUSTER_NODE_RELATIONSHIP) { + CLUSTER_NODE_RELATIONSHIP.remove(key); + if (newKey != null) { + Set hostSpecSet = CLUSTER_NODE_RELATIONSHIP.get(newKey); + if (hostSpecSet == null) { + hostSpecSet = new HashSet<>(); + } + hostSpecSet.addAll(slaves); + hostSpecSet.remove(newKey); + CLUSTER_NODE_RELATIONSHIP.put(newKey, hostSpecSet); + } + } + } + + public Set get(HostSpec hostSpec) { + synchronized (CLUSTER_NODE_RELATIONSHIP) { + return CLUSTER_NODE_RELATIONSHIP.get(hostSpec); + } + } + + +} diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java new file mode 100644 index 0000000..413bc7a --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/ClusterNodeCache.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.PGProperty; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * Cluster information processing, cache connection information, whether to start the heartbeat thread + */ +public class ClusterNodeCache { + private static volatile boolean status; + private static final String STATUS_LOCK = "status"; + private static Log LOGGER = Logger.getLogger(ClusterNodeCache.class.getName()); + private static final ExecutorService executorService = Executors.newCachedThreadPool(); + private static final ClusterHeartBeat CLUSTER_HEART_BEAT = new ClusterHeartBeat(); + + public static boolean isOpen() { + return status; + } + + /** + * The faulty host is switched to a new host + * @param hostSpecs the parsed/defaulted connection properties + */ + public static void checkReplacement(HostSpec[] hostSpecs) { + if (hostSpecs == null) { + return; + } + ClusterHeartBeatFailureMaster failureMaster = ClusterHeartBeatFailureMaster.getInstance(); + Map failureMap = failureMaster.getFailureMaster(); + for (int i = 0; i < hostSpecs.length; i++) { + while (failureMap.containsKey(hostSpecs[i])) { + if (hostSpecs[i] == failureMap.get(hostSpecs[i])) { + failureMaster.remove(hostSpecs[i]); + return; + } + hostSpecs[i] = failureMap.get(hostSpecs[i]); + } + } + } + + /** + * Verify parameters and replace the failed primary node + * @param hostSpecs cluster node + * @param properties the parsed/defaulted connection properties + */ + public static void checkHostSpecs(HostSpec[] hostSpecs, Properties properties) throws PSQLException { + // check the interval of heartbeat threads. + Set set = Arrays.stream(hostSpecs) + .collect(Collectors.toSet()); + if (set.size() > 1) { + checkReplacement(hostSpecs); + } + } + + /** + * + * @param master master node + * @param hostSpecs cluster node + * @param properties the parsed/defaulted connection properties + */ + public static void pushHostSpecs(HostSpec master, HostSpec[] hostSpecs, Properties properties) { + Set set = Arrays.stream(hostSpecs) + .collect(Collectors.toSet()); + String period = PGProperty.HEARTBEAT_PERIOD.get(properties); + boolean open = true; + if (!isNumeric(period)) { + open = false; + LOGGER.info("Invalid heartbeatPeriod value: " + period); + } + long timePeriod = Long.parseLong(period); + if (timePeriod <= 0) { + open = false; + LOGGER.info("Invalid heartbeatPeriod value: " + period); + } + if (set.size() > 1 && open) { + CLUSTER_HEART_BEAT.addNodeRelationship(master, hostSpecs, properties); + start(); + } + } + + private static void start() { + if (status) { + LOGGER.info("heartbeat thread ----> started"); + return; + } + synchronized (STATUS_LOCK) { + if (status) { + LOGGER.info("heartbeat thread ----> started"); + return; + } + status = true; + } + executorService.execute(CLUSTER_HEART_BEAT::masterNodeProbe); + } + + public static void stop() { + status = false; + } + + private static boolean isNumeric(final CharSequence cs) { + if (cs.length() == 0) { + return false; + } + final int size = cs.length(); + for (int i = 0; i < size; i++) { + if (!Character.isDigit(cs.charAt(i))) { + return false; + } + } + return true; + } + + +} diff --git a/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java new file mode 100644 index 0000000..9af78d1 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/clusterhealthy/FailureCluster.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.clusterhealthy; + +import org.postgresql.util.HostSpec; + +import java.util.Properties; +import java.util.Set; + +/** + * Cluster information instance, including primary and secondary nodes and connection information + */ +public class FailureCluster { + private HostSpec master; + private Set salves; + private Set props; + + /** + * + * @param master Current master node + * @param salves Slave set + * @param props Connection information + */ + public FailureCluster(HostSpec master, Set salves, Set props) { + this.master = master; + this.salves = salves; + this.props = props; + } + + public void setSalves(Set salves) { + this.salves = salves; + } + + public void setProps(Set props) { + this.props = props; + } + + public void setMaster(HostSpec master) { + this.master = master; + } + + public HostSpec getMaster() { + return master; + } + + public Set getSalves() { + return salves; + } + + public Set getProps() { + return props; + } +} diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java index 50aa32c..95d175b 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java @@ -9,6 +9,7 @@ package org.postgresql.core.v3; import org.postgresql.PGProperty; import org.postgresql.clusterchooser.ClusterStatus; import org.postgresql.clusterchooser.GlobalClusterStatusTracker; +import org.postgresql.clusterhealthy.ClusterNodeCache; import org.postgresql.core.ConnectionFactory; import org.postgresql.core.PGStream; import org.postgresql.core.QueryExecutor; @@ -58,7 +59,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { private static final int AUTH_REQ_GSS = 7; private static final int AUTH_REQ_GSS_CONTINUE = 8; private static final int AUTH_REQ_SSPI = 9; - + public static String CLIENT_ENCODING = "UTF8"; public static String USE_BOOLEAN = "false"; private static final int AUTH_REQ_SHA256 = 10; @@ -98,7 +99,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { setStaticUseBoolean(useBoolean); } - + private void setSocketTimeout(PGStream stream, Properties info, PGProperty propKey) throws SQLException, IOException { // Set the socket timeout if the "socketTimeout" property has been set. int socketTimeout = Integer.parseInt(propKey.getDefaultValue()); @@ -112,14 +113,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory { } } - private PGStream tryConnect(String user, String database, + public PGStream tryConnect(String user, String database, Properties info, SocketFactory socketFactory, HostSpec hostSpec, SslMode sslMode) throws SQLException, IOException { int connectTimeout = Integer.parseInt(PGProperty.CONNECT_TIMEOUT.getDefaultValue()); if (PGProperty.CONNECT_TIMEOUT.getInt(info) <= Integer.MAX_VALUE / 1000) { connectTimeout = PGProperty.CONNECT_TIMEOUT.getInt(info) * 1000; - } else { + } else { LOGGER.debug("integer connectTimeout is too large, it will occur error after multiply by 1000."); } @@ -211,6 +212,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory { ClusterSpec clusterSpec = clusterIter.next(); HostSpec[] currentHostSpecs = clusterSpec.getHostSpecs(); + if (currentHostSpecs.length > 1 && targetServerType == HostRequirement.master) { + ClusterNodeCache.checkHostSpecs(currentHostSpecs, info); + } HostChooser hostChooser = HostChooserFactory.createHostChooser(currentHostSpecs, targetServerType, info); Iterator hostIter = hostChooser.iterator(); @@ -326,6 +330,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory { if (candidateHost.targetServerType != HostRequirement.any) { hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Secondary; LOGGER.info("Known status of host " + hostSpec + " is " + hostStatus); + if (hostStatus == HostStatus.Master) { + ClusterNodeCache.pushHostSpecs(hostSpec, currentHostSpecs, info); + } } GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus, info); knownStates.put(hostSpec, hostStatus); @@ -588,7 +595,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { else if(this.protocolVerion == PROTOCOL_VERSION_350) pgStream.sendInteger2(50); // protocol minor else if(this.protocolVerion == PROTOCOL_VERSION_351) - pgStream.sendInteger2(51); // protocol minor + pgStream.sendInteger2(51); // protocol minor for (byte[] encodedParam : encodedParams) { pgStream.send(encodedParam); pgStream.sendChar(0); @@ -855,8 +862,8 @@ public class ConnectionFactoryImpl extends ConnectionFactory { Utils.escapeLiteral(sql, appName, queryExecutor.getStandardConformingStrings()); sql.append("'"); SetupQueryRunner.run(queryExecutor, sql.toString(), false); - } - + } + String appType = PGProperty.APPLICATION_TYPE.get(info); if (appType !=null && !appType.equals(queryExecutor.getApplicationType())) { StringBuilder sql = new StringBuilder(); @@ -867,7 +874,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { } } - private boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException { + public boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException { String localRole = ""; String dbState = ""; List results = SetupQueryRunner.runForList(queryExecutor, "select local_role, db_state from pg_stat_get_stream_replications();", true); @@ -885,8 +892,8 @@ public class ConnectionFactoryImpl extends ConnectionFactory { String datcompatibility = queryExecutor.getEncoding().decode(result[0]); return datcompatibility == null ? "PG" : datcompatibility; } - - + + private String queryGaussdbVersion(QueryExecutor queryExecutor) throws SQLException, IOException { byte[][] result = SetupQueryRunner.run(queryExecutor, "select version();", true); String version = queryExecutor.getEncoding().decode(result[0]); diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java new file mode 100644 index 0000000..b787f1b --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureClusterTest.java @@ -0,0 +1,38 @@ +package org.postgresql.clusterhealthy; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; + +import java.util.HashSet; +import java.util.List; +import java.util.Properties; + +import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; +import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getProperties; + +public class ClusterHeartBeatFailureClusterTest { + + @Before + public void initDirver() throws Exception { + TestUtil.initDriver(); + } + + @Test + public void runTest() { + ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster = ClusterHeartBeatFailureCluster.getInstance(); + List hostSpecs = getHostSpecs(); + HostSpec master = hostSpecs.get(0); + hostSpecs.remove(master); + HashSet set = new HashSet<>(); + set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); + FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set); + clusterHeartBeatFailureCluster.addFailureCluster(failureCluster); + System.out.println(clusterHeartBeatFailureCluster.getFailureCluster()); + clusterHeartBeatFailureCluster.run(); + System.out.println(clusterHeartBeatFailureCluster.getFailureCluster()); + Assert.assertTrue(clusterHeartBeatFailureCluster.getFailureCluster().size() == 0); + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMasterTest.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMasterTest.java new file mode 100644 index 0000000..77512f3 --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatFailureMasterTest.java @@ -0,0 +1,35 @@ +package org.postgresql.clusterhealthy; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; + +import java.util.HashSet; +import java.util.List; +import java.util.Properties; + +import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; + +public class ClusterHeartBeatFailureMasterTest { + @Before + public void initDirver() throws Exception { + TestUtil.initDriver(); + } + + @Test + public void runTest() { + ClusterHeartBeatFailureMaster instance = ClusterHeartBeatFailureMaster.getInstance(); + List hostSpecs = getHostSpecs(); + instance.addFailureMaster(hostSpecs.get(0), hostSpecs.get(1)); + HashSet set = new HashSet<>(); + set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); + instance.addProperties(hostSpecs.get(0), set); + instance.addProperties(hostSpecs.get(1), set); + Assert.assertTrue(instance.getFailureMaster().size() != 0); + instance.run(); + System.out.println(instance.getFailureMaster()); + Assert.assertTrue(instance.getFailureMaster().size() == 0); + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatMasterTest.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatMasterTest.java new file mode 100644 index 0000000..ddd46ee --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatMasterTest.java @@ -0,0 +1,40 @@ + +package org.postgresql.clusterhealthy; + + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs; + +public class ClusterHeartBeatMasterTest { + @Before + public void initDirver() throws Exception { + TestUtil.initDriver(); + } + + @Test + public void runTest() { + ClusterHeartBeatMaster clusterHeartBeatMaster = ClusterHeartBeatMaster.getInstance(); + Map> clusterRelationship = clusterHeartBeatMaster.getClusterRelationship(); + List hostSpecs = getHostSpecs(); + HostSpec master = hostSpecs.get(0); + hostSpecs.remove(master); + clusterRelationship.put(master, new HashSet<>(hostSpecs)); + HashSet set = new HashSet<>(); + set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs())); + clusterHeartBeatMaster.addProperties(master, set); + clusterHeartBeatMaster.run(); + Assert.assertTrue(clusterRelationship.containsKey(master)); + } + +} diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatUtil.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatUtil.java new file mode 100644 index 0000000..58535ee --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterHeartBeatUtil.java @@ -0,0 +1,66 @@ +package org.postgresql.clusterhealthy; + +import org.postgresql.util.HostSpec; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static java.util.stream.Collectors.joining; + +public class ClusterHeartBeatUtil { + public static List getHostSpecs() { + List hostSpecList = new ArrayList<>(); + hostSpecList.add(new HostSpec(System.getProperty("server"), + Integer.parseInt(System.getProperty("port")))); + hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"), + Integer.parseInt(System.getProperty("secondaryPort")))); + hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"), + Integer.parseInt(System.getProperty("secondaryServerPort2")))); + return hostSpecList; + } + public static String getUrl() { + List list = new ArrayList<>(); + list.add(System.getProperty("server") + ":" + System.getProperty("port")); + list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort")); + list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2")); + String serverAndPort = list.stream() + .collect(joining(",")); + String database = getDatabase(); + return String.format("jdbc:postgresql://%s/%s", serverAndPort, database); + } + + public static String getDatabase() { + return System.getProperty("database"); + } + + public static String getUsername () { + return System.getProperty("username"); + } + + public static String getPassword() { + return System.getProperty("password"); + } + + public static Properties getProperties(List hostSpecs) { + + Properties properties = new Properties(); + properties.put("user", getUsername()); + properties.put("password", getPassword()); + Properties info = new Properties(properties); + info.put("PGDBNAME", getDatabase()); + info.put("PGPORT", hostSpecs.stream() + .map(o -> String.valueOf(o.getPort())) + .collect(joining(","))); + info.put("PGHOSTURL", hostSpecs.stream() + .map(HostSpec::getHost) + .collect(joining(","))); + info.put("PGHOST", hostSpecs.stream() + .map(HostSpec::getHost) + .collect(joining(","))); + info.put("PGPORTURL", hostSpecs.stream() + .map(o -> String.valueOf(o.getPort())) + .collect(joining(","))); + return info; + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterNodeCacheTest.java b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterNodeCacheTest.java new file mode 100644 index 0000000..172ed92 --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/clusterhealthy/ClusterNodeCacheTest.java @@ -0,0 +1,155 @@ +package org.postgresql.clusterhealthy; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static java.util.stream.Collectors.joining; +import static org.postgresql.clusterhealthy.ClusterNodeCache.checkHostSpecs; + +public class ClusterNodeCacheTest { + + private List getHostSpecs() { + List hostSpecList = new ArrayList<>(); + hostSpecList.add(new HostSpec(System.getProperty("server"), + Integer.parseInt(System.getProperty("port")))); + hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"), + Integer.parseInt(System.getProperty("secondaryPort")))); + hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"), + Integer.parseInt(System.getProperty("secondaryServerPort2")))); + return hostSpecList; + } + private static String getUrl() { + List list = new ArrayList<>(); + list.add(System.getProperty("server") + ":" + System.getProperty("port")); + list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort")); + list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2")); + String serverAndPort = list.stream() + .collect(joining(",")); + String database = getDatabase(); + return String.format("jdbc:postgresql://%s/%s", serverAndPort, database); + } + + private static String getDatabase() { + return System.getProperty("database"); + } + + private static String getUsername () { + return System.getProperty("username"); + } + + private static String getPassword() { + return System.getProperty("password"); + } + + private static Properties getProperties(List hostSpecs) { + + Properties properties = new Properties(); + properties.put("user", getUsername()); + properties.put("password", getPassword()); + Properties info = new Properties(properties); + info.put("PGDBNAME", getDatabase()); + info.put("PGPORT", hostSpecs.stream() + .map(o -> String.valueOf(o.getPort())) + .collect(joining(","))); + info.put("PGHOSTURL", hostSpecs.stream() + .map(HostSpec::getHost) + .collect(joining(","))); + info.put("PGHOST", hostSpecs.stream() + .map(HostSpec::getHost) + .collect(joining(","))); + info.put("PGPORTURL", hostSpecs.stream() + .map(o -> String.valueOf(o.getPort())) + .collect(joining(","))); + return info; + } + + @Before + public void initDirver() throws Exception { + TestUtil.initDriver(); + } + @Test + public void getClusterConnection() throws SQLException { +// Class.forName("org.postgresql.Driver"); + + DriverManager.getConnection(getUrl(), getUsername(), getPassword()); + + } + + @Test + public void testPeriodTime() throws Exception { + String url = getUrl() + "?targetServerType=master"; + DriverManager.getConnection(getUrl(), getUsername(), getPassword()); + String newUrl = url + "&heartbeatPeriod=3000"; + DriverManager.getConnection(newUrl, getUsername(), getPassword()); + Thread.sleep(10000L); + } + + @Test + public void testUnavailablePassword() throws ClassNotFoundException, SQLException, InterruptedException { +// Class.forName("org.postgresql.Driver"); + String url = getUrl() + "?targetServerType=master"; + DriverManager.getConnection(url, getUsername(), getPassword()); + HostSpec master = new HostSpec(System.getProperty("server"), + Integer.parseInt(System.getProperty("port"))); + ClusterHeartBeatMaster instance = ClusterHeartBeatMaster.getInstance(); + Set properties = instance.getProperties(master); + int before = properties.size(); + // Change password + Thread.sleep(10000L); + + Set afterProperties = instance.getProperties(master); + int after = afterProperties.size(); + + Assert.assertNotEquals(after, before); + + Thread.sleep(10000L); + } + + @Test + public void testCheckReplacement() { + HostSpec master = new HostSpec(System.getProperty("server"), + Integer.parseInt(System.getProperty("port"))); + Map failureMap = ClusterHeartBeatFailureMaster.failureMap; + HostSpec node = new HostSpec("10.10.0.1", 2525); + failureMap.put(master, node); + List hostSpecList = getHostSpecs(); + HostSpec[] hostSpecs = hostSpecList.toArray(new HostSpec[0]); + ClusterNodeCache.checkReplacement(hostSpecs); + boolean containsMaster = false; + boolean containsNode = false; + for (HostSpec hostSpec : hostSpecs) { + if (hostSpec.equals(master)) { + containsMaster = true; + } + if (hostSpec.equals(node)) { + containsNode = true; + } + } + Assert.assertTrue(containsNode); + Assert.assertFalse(containsMaster); + } + + @Test + public void testpushHostSpecs() throws Exception { + + List hostSpecs = getHostSpecs(); + Properties info = getProperties(hostSpecs); + checkHostSpecs(hostSpecs.toArray(new HostSpec[0]), info); + System.out.println(ClusterNodeCache.isOpen()); + Thread.sleep(30000); + } + + + +}