!105 JDBC高可用优化

Merge pull request !105 from 陈紫阳/master
This commit is contained in:
opengauss-bot
2023-02-28 14:07:46 +00:00
committed by Gitee
14 changed files with 1260 additions and 17 deletions

View File

@ -5,7 +5,9 @@ import org.postgresql.jdbc.PgConnection;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -26,6 +28,16 @@ public class GlobalConnectionTracker {
return PGProperty.FORCE_TARGET_SERVER_SLAVE.getBoolean(props) &&
("slave".equals(PGProperty.TARGET_SERVER_TYPE.get(props)) || "secondary".equals(PGProperty.TARGET_SERVER_TYPE.get(props)));
}
/**
*
* @param props the parsed/defaulted connection properties
* @return
*/
private static boolean isTargetServerMaster(Properties props) {
return "master".equals(PGProperty.TARGET_SERVER_TYPE.get(props));
}
/**
* Store the actual query executor and connection host spec.
*
@ -33,7 +45,9 @@ public class GlobalConnectionTracker {
* @param queryExecutor
*/
public static void possessConnectionReference(QueryExecutor queryExecutor, Properties props) {
if(!isForceTargetServerSlave(props)) return;
if (!isForceTargetServerSlave(props) || !isTargetServerMaster(props)) {
return;
}
int identityQueryExecute = System.identityHashCode(queryExecutor);
String hostSpec = queryExecutor.getHostSpec().toString();
synchronized (connectionManager) {
@ -53,7 +67,9 @@ public class GlobalConnectionTracker {
* @param queryExecutor
*/
public static void releaseConnectionReference(QueryExecutor queryExecutor, Properties props) {
if(!isForceTargetServerSlave(props)) return;
if (!isForceTargetServerSlave(props)) {
return;
}
String hostSpec = queryExecutor.getHostSpec().toString();
int identityQueryExecute = System.identityHashCode(queryExecutor);
synchronized (connectionManager) {
@ -93,4 +109,46 @@ public class GlobalConnectionTracker {
}
}
/**
* Close all existing connections under the specified host.
*
* @param hostSpec ip and port.
*/
public static void closeConnectionOfCrash(String hostSpec) {
synchronized (connectionManager) {
HashMap<Integer, QueryExecutor> hostConnection = connectionManager.getOrDefault(hostSpec, null);
if (hostConnection != null) {
LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, start to close the original connection.");
for (QueryExecutor queryExecutor : hostConnection.values()) {
if (queryExecutor != null && !queryExecutor.isClosed()) {
queryExecutor.close();
queryExecutor.setAvailability(false);
}
}
hostConnection.clear();
LOGGER.debug("[CRASH] The hostSpec: " + hostSpec + " fails, end to close the original connection.");
}
}
}
/**
* get all existing connections under the specified host.
*
* @param hostSpec ip and port.
*/
public static List<QueryExecutor> getConnections(String hostSpec) {
synchronized (connectionManager) {
List<QueryExecutor> ret = new ArrayList<>();
HashMap<Integer, QueryExecutor> hostConnection = connectionManager.getOrDefault(hostSpec, null);
if (hostConnection != null) {
for (Map.Entry<Integer, QueryExecutor> queryExecutorEntry : hostConnection.entrySet()) {
ret.add(queryExecutorEntry.getValue());
}
}
return ret;
}
}
}

View File

@ -198,12 +198,12 @@ public enum PGProperty {
*/
SSL_MODE("sslmode", null, "Parameter governing the use of SSL", false,
"disable", "allow", "prefer", "require", "verify-ca", "verify-full"),
/**
* Context of SSL(SSLContext.getInstance("@code")): empty for TLS,valid values{SSL/SSLv2/SSLv3/TLS/TLSv1/TLSv1.1/TLSv1.2}
*/
SSL_CONTEXT("sslcontext", null, "Control use of SSL Context(SSL, TLS, TLSv1.2, etc)"),
/**
* Classname of the SSL Factory to use (instance of {@code javax.net.ssl.SSLSocketFactory}).
*/
@ -252,7 +252,7 @@ public enum PGProperty {
SSL_PRIVATEKEY_FACTORY("sslprivatekeyfactory", null,
"The privatekey factory for the client's ssl"),
/**
* The classname instantiating {@code javax.security.auth.callback.CallbackHandler} to use.
*/
@ -287,7 +287,7 @@ public enum PGProperty {
* value of zero means that it is disabled.
*/
SOCKET_TIMEOUT("socketTimeout", "0", "The timeout value used for socket read operations."),
/**
* The timeout value used for socket read operations when jdbc connecting. If reading from the server takes longer than
* this value, the connection is closed. This can be used as both a brute force global query
@ -339,7 +339,7 @@ public enum PGProperty {
* The application name (require server version &gt;= 9.0).
*/
APPLICATION_NAME("ApplicationName", DriverInfo.DRIVER_NAME, "Name of the Application (backend >= 9.0)"),
APPLICATION_TYPE("ApplicationType", null, "Application Type"),
/**
@ -532,6 +532,11 @@ public enum PGProperty {
"",
"Factory class to instantiate factories for XML processing"),
/**
* It is used to detect the thread interval of the survival task on the primary node in the high availability scenario.
*/
HEARTBEAT_PERIOD("heartbeatPeriod", "0", "heartbeat interval time")
;
private String _name;

View File

@ -0,0 +1,276 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.PGProperty;
import org.postgresql.core.PGStream;
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.v3.ConnectionFactoryImpl;
import org.postgresql.core.v3.QueryExecutorImpl;
import org.postgresql.jdbc.SslMode;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import javax.net.SocketFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.postgresql.GlobalConnectionTracker.closeConnectionOfCrash;
import static org.postgresql.GlobalConnectionTracker.getConnections;
import static org.postgresql.clusterhealthy.ClusterNodeCache.isOpen;
import static org.postgresql.util.PSQLState.CONNECTION_REJECTED;
/**
* Heartbeat detection, active detection of the primary node, and maintenance of cache after failure The next
* contest pair active detection of the failed node, detection of the cluster where the primary node has failed,
* and maintenance of the latest information to the cache set。
*/
public class ClusterHeartBeat {
public static final Map<HostSpec, Set<Properties>> CLUSTER_PROPERTIES = new ConcurrentHashMap<>();
private static Log LOGGER = Logger.getLogger(ClusterHeartBeat.class.getName());
private static final ConnectionFactoryImpl FACTORY = new ConnectionFactoryImpl();
private static final String UPDATE_TIME = "time";
private volatile Long periodTime = 5000L;
/**
* heartbeat task thread.
*/
public void masterNodeProbe() {
while (isOpen()) {
LOGGER.debug("heartBeat thread start time: " + new Date(System.currentTimeMillis()));
// failed node detection
ClusterHeartBeatFailureMaster.getInstance().run();
// active primary node detection
ClusterHeartBeatMaster.getInstance().run();
// The failed cluster seeks the primary node
ClusterHeartBeatFailureCluster.getInstance().run();
try {
Thread.sleep(periodTime);
} catch (InterruptedException e) {
LOGGER.debug(e.getStackTrace());
}
}
periodTime = 5000L;
}
/**
* get parsed/defaulted connection properties
* @param hostSpec ip and port
* @return properties set
*/
public Set<Properties> getProperties(HostSpec hostSpec) {
synchronized (CLUSTER_PROPERTIES) {
return CLUSTER_PROPERTIES.computeIfAbsent(hostSpec, k -> new HashSet<>());
}
}
public Map<HostSpec, Set<HostSpec>> getClusterRelationship () {
return ClusterHeartBeatMaster.getInstance().getClusterRelationship();
}
/**
* Adding Cluster Information
* @param key master
* @param value secondary list
* @param properties the parsed/defaulted connection properties
*/
public void addNodeRelationship (HostSpec key, HostSpec[] value, Properties properties) {
// update node host and ip
addClusterNode(key, value);
// update node properties
addProperties(key, Collections.singleton(properties));
if (PGProperty.HEARTBEAT_PERIOD.get(properties) != null) {
String period = PGProperty.HEARTBEAT_PERIOD.get(properties);
long time = Long.parseLong(period);
synchronized (UPDATE_TIME) {
if (time > 0) {
periodTime = Math.min(periodTime, time);
}
}
}
}
/**
*
* @param hostSpec ip and port
* @param properties the parsed/defaulted connection properties
*/
public void addProperties(HostSpec hostSpec, Set<Properties> properties) {
synchronized (CLUSTER_PROPERTIES) {
Set<Properties> propertiesSet = CLUSTER_PROPERTIES.get(hostSpec);
if (propertiesSet == null) {
propertiesSet = new HashSet<>();
}
propertiesSet.addAll(properties);
CLUSTER_PROPERTIES.put(hostSpec, propertiesSet);
}
}
/**
* Update the cluster node relationship
* @param key old master
* @param newKey new master
* @param slaves slaves set
*/
public void removeClusterNode(HostSpec key, HostSpec newKey, Set<HostSpec> slaves) {
ClusterHeartBeatMaster.getInstance().removeClusterNode(key, newKey, slaves);
}
/**
* Perform cache maintenance on cluster nodes connected to hosts
* @param hostSpecs the parsed/defaulted connection properties
* @param value slaves set
*/
public void addClusterNode(HostSpec hostSpecs, HostSpec... value) {
ClusterHeartBeatMaster.getInstance().addClusterNode(hostSpecs, value);
}
/**
* Get the cluster slave node
* @param hostSpec the parsed/defaulted connection properties
* @return slaves set
*/
public Set<HostSpec> getClusterSalveNode(HostSpec hostSpec) {
return ClusterHeartBeatMaster.getInstance().getClusterSalveNode(hostSpec);
}
/**
*
* @param hostSpec ip and port
* @param properties the parsed/defaulted connection properties
*/
public void removeProperties(HostSpec hostSpec, Properties properties) {
synchronized (CLUSTER_PROPERTIES) {
Set<Properties> propertiesSet = CLUSTER_PROPERTIES.getOrDefault(hostSpec, null);
if (propertiesSet != null) {
propertiesSet.remove(properties);
CLUSTER_PROPERTIES.put(hostSpec, propertiesSet);
}
}
}
/**
* the node probes the activity by reflecting the tryConnect() method.
*
* @param hostSpec ip and port.
* @param propSet the parsed/defaulted connection properties
* @return QueryExecutor
* @throws SQLException new sql exception
*/
public QueryExecutor getQueryExecutor(HostSpec hostSpec, Set<Properties> propSet) throws SQLException {
Properties props = null;
try {
for (Properties properties : propSet) {
props = properties;
SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(props);
SslMode sslMode = SslMode.of(props);
String user = props.getProperty("user", "");
String database = props.getProperty("PGDBNAME", "");
PGStream pgStream = FACTORY.tryConnect(user, database, props, socketFactory, hostSpec, sslMode);
return new QueryExecutorImpl(pgStream, user, database,
1000, props);
}
} catch (SQLException e) {
String sqlState = e.getSQLState();
if (CONNECTION_REJECTED.getState().equals(sqlState) || "28P01".equals(sqlState)) {
LOGGER.debug("node " + hostSpec + " is active, and connenction authentication fails.");
LOGGER.debug("remove before propSet size :" + propSet.size());
removeProperties(hostSpec, props);
LOGGER.debug("remove after propSet size :" + propSet.size());
}
LOGGER.debug("acquire QueryExecutor failure " + e.getMessage());
} catch (IOException e) {
LOGGER.debug("acquire QueryExecutor failure " + e.getMessage());
LOGGER.debug(e.getCause());
}
throw new SQLException();
}
/**
* Check whether the node is the primary node
*
* @param queryExecutor queryExector
* @return true/false
*/
public boolean nodeRoleIsMaster(QueryExecutor queryExecutor) {
try {
return FACTORY.isMaster(queryExecutor);
} catch (SQLException | IOException e) {
LOGGER.debug("Error obtaining node role " + e.getMessage());
LOGGER.debug(e.getStackTrace());
return false;
}
}
/**
* Post-processing after the primary node fails
*
* @param hostSpec master ip and port
* @param slaves slaves set
* @param props the parsed/defaulted connection properties
*/
public void cacheProcess(HostSpec hostSpec, Set<HostSpec> slaves, Set<Properties> props) {
HostSpec maseterNode = findMasterNode(slaves, props);
removeClusterNode(hostSpec, maseterNode, slaves);
if (maseterNode != null) {
addProperties(maseterNode, props);
ClusterHeartBeatFailureMaster.getInstance().addFailureMaster(hostSpec, maseterNode);
} else {
FailureCluster cluster = new FailureCluster(hostSpec, slaves, props);
ClusterHeartBeatFailureCluster.getInstance().addFailureCluster(cluster);
}
closeConnectionOfCrash(hostSpec.toString());
}
/**
* Locate the host on the standby computer
*
* @param hostSpecSet slaves set
* @param properties the parsed/defaulted connection properties
* @return new master node
*/
public HostSpec findMasterNode(Set<HostSpec> hostSpecSet, Set<Properties> properties) {
for (HostSpec hostSpec : hostSpecSet) {
List<QueryExecutor> queryExecutorList = getConnections(hostSpec.toString());
for (QueryExecutor executor : queryExecutorList) {
if (!executor.isClosed() && nodeRoleIsMaster(executor)) {
return hostSpec;
}
}
QueryExecutor queryExecutor = null;
try {
queryExecutor = getQueryExecutor(hostSpec, properties);
} catch (SQLException e) {
continue;
}
boolean isMaster = nodeRoleIsMaster(queryExecutor);
if (isMaster) {
return hostSpec;
}
}
return null;
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.core.QueryExecutor;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* After the primary node breaks down during heartbeat maintenance,
* the cluster is in the no-host state. This section describes how to detect hosts in a cluster without hosts.
*/
public class ClusterHeartBeatFailureCluster extends ClusterHeartBeat{
public static List<FailureCluster> failureCluster = new ArrayList<>();
private volatile static ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster;
private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureCluster.class.getName());
private ClusterHeartBeatFailureCluster() {
}
public static synchronized ClusterHeartBeatFailureCluster getInstance() {
if (clusterHeartBeatFailureCluster == null) {
clusterHeartBeatFailureCluster = new ClusterHeartBeatFailureCluster();
}
return clusterHeartBeatFailureCluster;
}
/**
* After the active node fails, find a new active node on the standby node or check whether the active node is resurrected
*/
public void run() {
if (failureCluster.isEmpty()) {
return;
}
List<FailureCluster> list = new ArrayList<>(failureCluster);
failureCluster.clear();
LOGGER.debug("cluster does not have a master node" + list);
for (FailureCluster cluster : list) {
QueryExecutor queryExecutor = null;
try {
if (cluster == null || cluster.getMaster() == null) {
continue;
}
queryExecutor = getQueryExecutor(cluster.getMaster(), cluster.getProps());
} catch (SQLException e) {
Set<HostSpec> salves = cluster.getSalves();
int count = 0;
for (HostSpec salf : salves) {
try {
getQueryExecutor(salf, cluster.getProps());
} catch (SQLException ex) {
count++;
}
}
if (count == salves.size()) {
continue;
}
cacheProcess(cluster.getMaster(), salves, cluster.getProps());
}
if (queryExecutor != null) {
boolean isMaster = nodeRoleIsMaster(queryExecutor);
if (isMaster) {
addClusterNode(cluster.getMaster(), cluster.getSalves().toArray(new HostSpec[0]));
addProperties(cluster.getMaster(), cluster.getProps());
} else {
HostSpec maseterNode = findMasterNode(cluster.getSalves(), cluster.getProps());
if (maseterNode != null) {
addProperties(maseterNode, cluster.getProps());
Set<HostSpec> salves = cluster.getSalves();
salves.add(cluster.getMaster());
removeClusterNode(cluster.getMaster(), maseterNode, salves);
}
}
}
}
}
public void addFailureCluster(FailureCluster cluster) {
failureCluster.add(cluster);
}
public List<FailureCluster> getFailureCluster() {
return failureCluster;
}
}

View File

@ -0,0 +1,117 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.core.QueryExecutor;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* If only one host in the cluster is down, and a new host is selected,
* the relationship between the faulty host and the new host needs to be maintained.
* When the faulty host recovers, it needs to be added to the node again for maintenance
*/
public class ClusterHeartBeatFailureMaster extends ClusterHeartBeat{
public static Map<HostSpec, HostSpec> failureMap = new ConcurrentHashMap<>();
private volatile static ClusterHeartBeatFailureMaster clusterHeartBeatFailureMaster;
private static Log LOGGER = Logger.getLogger(ClusterHeartBeatFailureMaster.class.getName());
private ClusterHeartBeatFailureMaster() {
}
public static synchronized ClusterHeartBeatFailureMaster getInstance() {
if (clusterHeartBeatFailureMaster == null) {
clusterHeartBeatFailureMaster = new ClusterHeartBeatFailureMaster();
}
return clusterHeartBeatFailureMaster;
}
/**
* If the failed node is alive, join cache maintenance
*/
public void run() {
HashMap<HostSpec, HostSpec> failureMapClone = new HashMap<>(failureMap);
LOGGER.debug("failure node " + failureMapClone);
for (Map.Entry<HostSpec, HostSpec> next : failureMapClone.entrySet()) {
HostSpec key = next.getKey();
HostSpec value = next.getValue();
Set<Properties> properties = getProperties(key);
QueryExecutor queryExecutor = null;
try {
queryExecutor = getQueryExecutor(key, properties);
failureMap.remove(key);
} catch (SQLException e) {
LOGGER.error(key.toString() + " tryConnect failure.");
continue;
}
boolean isMaster = nodeRoleIsMaster(queryExecutor);
if (isMaster) {
HostSpec current = value;
while (failureMap.containsKey(current)) {
current = failureMap.get(current);
}
if (getClusterRelationship().containsKey(current)) {
Set<Properties> prop = getProperties(key);
boolean currentIsMaster;
try {
QueryExecutor currentQueryExecutor = getQueryExecutor(current, prop);
currentIsMaster = nodeRoleIsMaster(currentQueryExecutor);
} catch (SQLException e) {
currentIsMaster = false;
}
if (!currentIsMaster) {
Set<HostSpec> set = getClusterSalveNode(current);
set.add(current);
addClusterNode(key, set.toArray(new HostSpec[0]));
}
}
} else {
HostSpec current = value;
while (failureMap.containsKey(current)) {
if (current == failureMap.get(current)) {
failureMap.remove(current);
break;
}
current = failureMap.get(current);
}
addClusterNode(current, key);
}
}
}
public void addFailureMaster(HostSpec hostSpec, HostSpec maseterNode) {
failureMap.put(hostSpec, maseterNode);
}
public Map<HostSpec, HostSpec> getFailureMaster() {
return failureMap;
}
public void remove(HostSpec hostSpec) {
failureMap.remove(hostSpec);
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.core.QueryExecutor;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.postgresql.GlobalConnectionTracker.getConnections;
/**
* Maintain the primary node of the cluster. When the primary node breaks down,
* you need to detect a new host and bind the faulty host to the new host to facilitate
* quick switchover when the failed node is connected
*/
public class ClusterHeartBeatMaster extends ClusterHeartBeat {
public static final Map<HostSpec, Set<HostSpec>> CLUSTER_NODE_RELATIONSHIP = new ConcurrentHashMap<>();
private volatile static ClusterHeartBeatMaster ClusterHeartBeatMaster;
private static Log LOGGER = Logger.getLogger(ClusterHeartBeatMaster.class.getName());
private ClusterHeartBeatMaster() {
}
public static synchronized ClusterHeartBeatMaster getInstance() {
if (ClusterHeartBeatMaster == null) {
ClusterHeartBeatMaster = new ClusterHeartBeatMaster();
}
return ClusterHeartBeatMaster;
}
/**
* the primary node is active and added to the failure set after failure
*/
public void run() {
Iterator<Map.Entry<HostSpec, Set<HostSpec>>> iterator = CLUSTER_NODE_RELATIONSHIP.entrySet().iterator();
LOGGER.debug("master nodes " + CLUSTER_NODE_RELATIONSHIP);
while (iterator.hasNext()) {
Map.Entry<HostSpec, Set<HostSpec>> nodeMap = iterator.next();
HostSpec master = nodeMap.getKey();
Set<HostSpec> slaves = nodeMap.getValue();
LOGGER.debug("Current node " + master + " Standby node " + slaves);
QueryExecutor queryExecutor = null;
List<QueryExecutor> queryExecutorList = getConnections(master.toString());
for (QueryExecutor executor : queryExecutorList) {
if (!executor.isClosed()) {
queryExecutor = executor;
break;
}
}
Set<Properties> propertiesSet = getProperties(master);
if (queryExecutor == null) {
try {
queryExecutor = super.getQueryExecutor(master, propertiesSet);
} catch (SQLException e) {
LOGGER.debug("acquire QueryExecutor failure");
super.cacheProcess(master, slaves, propertiesSet);
continue;
}
}
LOGGER.debug("Information about the current connected node " + queryExecutor.getSocketAddress());
if (!super.nodeRoleIsMaster(queryExecutor)) {
LOGGER.debug(master + ":The host is degraded to the standby server.");
super.cacheProcess(master, slaves, propertiesSet);
}
}
}
public Map<HostSpec, Set<HostSpec>> getClusterRelationship() {
return CLUSTER_NODE_RELATIONSHIP;
}
public void addClusterNode(HostSpec hostSpecs, HostSpec... value) {
synchronized (CLUSTER_NODE_RELATIONSHIP) {
Set<HostSpec> hostSpecSet = CLUSTER_NODE_RELATIONSHIP.computeIfAbsent(hostSpecs, k -> new HashSet<>());
Arrays.stream(value)
.filter(host -> !host.equals(hostSpecs))
.forEach(hostSpecSet::add);
CLUSTER_NODE_RELATIONSHIP.put(hostSpecs, hostSpecSet);
}
}
public void removeClusterNode(HostSpec key, HostSpec newKey, Set<HostSpec> slaves) {
synchronized (CLUSTER_NODE_RELATIONSHIP) {
CLUSTER_NODE_RELATIONSHIP.remove(key);
if (newKey != null) {
Set<HostSpec> hostSpecSet = CLUSTER_NODE_RELATIONSHIP.get(newKey);
if (hostSpecSet == null) {
hostSpecSet = new HashSet<>();
}
hostSpecSet.addAll(slaves);
hostSpecSet.remove(newKey);
CLUSTER_NODE_RELATIONSHIP.put(newKey, hostSpecSet);
}
}
}
public Set<HostSpec> get(HostSpec hostSpec) {
synchronized (CLUSTER_NODE_RELATIONSHIP) {
return CLUSTER_NODE_RELATIONSHIP.get(hostSpec);
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.PGProperty;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* Cluster information processing, cache connection information, whether to start the heartbeat thread
*/
public class ClusterNodeCache {
private static volatile boolean status;
private static final String STATUS_LOCK = "status";
private static Log LOGGER = Logger.getLogger(ClusterNodeCache.class.getName());
private static final ExecutorService executorService = Executors.newCachedThreadPool();
private static final ClusterHeartBeat CLUSTER_HEART_BEAT = new ClusterHeartBeat();
public static boolean isOpen() {
return status;
}
/**
* The faulty host is switched to a new host
* @param hostSpecs the parsed/defaulted connection properties
*/
public static void checkReplacement(HostSpec[] hostSpecs) {
if (hostSpecs == null) {
return;
}
ClusterHeartBeatFailureMaster failureMaster = ClusterHeartBeatFailureMaster.getInstance();
Map<HostSpec, HostSpec> failureMap = failureMaster.getFailureMaster();
for (int i = 0; i < hostSpecs.length; i++) {
while (failureMap.containsKey(hostSpecs[i])) {
if (hostSpecs[i] == failureMap.get(hostSpecs[i])) {
failureMaster.remove(hostSpecs[i]);
return;
}
hostSpecs[i] = failureMap.get(hostSpecs[i]);
}
}
}
/**
* Verify parameters and replace the failed primary node
* @param hostSpecs cluster node
* @param properties the parsed/defaulted connection properties
*/
public static void checkHostSpecs(HostSpec[] hostSpecs, Properties properties) throws PSQLException {
// check the interval of heartbeat threads.
Set<HostSpec> set = Arrays.stream(hostSpecs)
.collect(Collectors.toSet());
if (set.size() > 1) {
checkReplacement(hostSpecs);
}
}
/**
*
* @param master master node
* @param hostSpecs cluster node
* @param properties the parsed/defaulted connection properties
*/
public static void pushHostSpecs(HostSpec master, HostSpec[] hostSpecs, Properties properties) {
Set<HostSpec> set = Arrays.stream(hostSpecs)
.collect(Collectors.toSet());
String period = PGProperty.HEARTBEAT_PERIOD.get(properties);
boolean open = true;
if (!isNumeric(period)) {
open = false;
LOGGER.info("Invalid heartbeatPeriod value: " + period);
}
long timePeriod = Long.parseLong(period);
if (timePeriod <= 0) {
open = false;
LOGGER.info("Invalid heartbeatPeriod value: " + period);
}
if (set.size() > 1 && open) {
CLUSTER_HEART_BEAT.addNodeRelationship(master, hostSpecs, properties);
start();
}
}
private static void start() {
if (status) {
LOGGER.info("heartbeat thread ----> started");
return;
}
synchronized (STATUS_LOCK) {
if (status) {
LOGGER.info("heartbeat thread ----> started");
return;
}
status = true;
}
executorService.execute(CLUSTER_HEART_BEAT::masterNodeProbe);
}
public static void stop() {
status = false;
}
private static boolean isNumeric(final CharSequence cs) {
if (cs.length() == 0) {
return false;
}
final int size = cs.length();
for (int i = 0; i < size; i++) {
if (!Character.isDigit(cs.charAt(i))) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.clusterhealthy;
import org.postgresql.util.HostSpec;
import java.util.Properties;
import java.util.Set;
/**
* Cluster information instance, including primary and secondary nodes and connection information
*/
public class FailureCluster {
private HostSpec master;
private Set<HostSpec> salves;
private Set<Properties> props;
/**
*
* @param master Current master node
* @param salves Slave set
* @param props Connection information
*/
public FailureCluster(HostSpec master, Set<HostSpec> salves, Set<Properties> props) {
this.master = master;
this.salves = salves;
this.props = props;
}
public void setSalves(Set<HostSpec> salves) {
this.salves = salves;
}
public void setProps(Set<Properties> props) {
this.props = props;
}
public void setMaster(HostSpec master) {
this.master = master;
}
public HostSpec getMaster() {
return master;
}
public Set<HostSpec> getSalves() {
return salves;
}
public Set<Properties> getProps() {
return props;
}
}

View File

@ -9,6 +9,7 @@ package org.postgresql.core.v3;
import org.postgresql.PGProperty;
import org.postgresql.clusterchooser.ClusterStatus;
import org.postgresql.clusterchooser.GlobalClusterStatusTracker;
import org.postgresql.clusterhealthy.ClusterNodeCache;
import org.postgresql.core.ConnectionFactory;
import org.postgresql.core.PGStream;
import org.postgresql.core.QueryExecutor;
@ -58,7 +59,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
private static final int AUTH_REQ_GSS = 7;
private static final int AUTH_REQ_GSS_CONTINUE = 8;
private static final int AUTH_REQ_SSPI = 9;
public static String CLIENT_ENCODING = "UTF8";
public static String USE_BOOLEAN = "false";
private static final int AUTH_REQ_SHA256 = 10;
@ -98,7 +99,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
setStaticUseBoolean(useBoolean);
}
private void setSocketTimeout(PGStream stream, Properties info, PGProperty propKey) throws SQLException, IOException {
// Set the socket timeout if the "socketTimeout" property has been set.
int socketTimeout = Integer.parseInt(propKey.getDefaultValue());
@ -112,14 +113,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
}
}
private PGStream tryConnect(String user, String database,
public PGStream tryConnect(String user, String database,
Properties info, SocketFactory socketFactory, HostSpec hostSpec,
SslMode sslMode)
throws SQLException, IOException {
int connectTimeout = Integer.parseInt(PGProperty.CONNECT_TIMEOUT.getDefaultValue());
if (PGProperty.CONNECT_TIMEOUT.getInt(info) <= Integer.MAX_VALUE / 1000) {
connectTimeout = PGProperty.CONNECT_TIMEOUT.getInt(info) * 1000;
} else {
} else {
LOGGER.debug("integer connectTimeout is too large, it will occur error after multiply by 1000.");
}
@ -211,6 +212,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
ClusterSpec clusterSpec = clusterIter.next();
HostSpec[] currentHostSpecs = clusterSpec.getHostSpecs();
if (currentHostSpecs.length > 1 && targetServerType == HostRequirement.master) {
ClusterNodeCache.checkHostSpecs(currentHostSpecs, info);
}
HostChooser hostChooser =
HostChooserFactory.createHostChooser(currentHostSpecs, targetServerType, info);
Iterator<CandidateHost> hostIter = hostChooser.iterator();
@ -326,6 +330,9 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
if (candidateHost.targetServerType != HostRequirement.any) {
hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Secondary;
LOGGER.info("Known status of host " + hostSpec + " is " + hostStatus);
if (hostStatus == HostStatus.Master) {
ClusterNodeCache.pushHostSpecs(hostSpec, currentHostSpecs, info);
}
}
GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus, info);
knownStates.put(hostSpec, hostStatus);
@ -588,7 +595,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
else if(this.protocolVerion == PROTOCOL_VERSION_350)
pgStream.sendInteger2(50); // protocol minor
else if(this.protocolVerion == PROTOCOL_VERSION_351)
pgStream.sendInteger2(51); // protocol minor
pgStream.sendInteger2(51); // protocol minor
for (byte[] encodedParam : encodedParams) {
pgStream.send(encodedParam);
pgStream.sendChar(0);
@ -855,8 +862,8 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
Utils.escapeLiteral(sql, appName, queryExecutor.getStandardConformingStrings());
sql.append("'");
SetupQueryRunner.run(queryExecutor, sql.toString(), false);
}
}
String appType = PGProperty.APPLICATION_TYPE.get(info);
if (appType !=null && !appType.equals(queryExecutor.getApplicationType())) {
StringBuilder sql = new StringBuilder();
@ -867,7 +874,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
}
}
private boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException {
public boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException {
String localRole = "";
String dbState = "";
List<byte[][]> results = SetupQueryRunner.runForList(queryExecutor, "select local_role, db_state from pg_stat_get_stream_replications();", true);
@ -885,8 +892,8 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
String datcompatibility = queryExecutor.getEncoding().decode(result[0]);
return datcompatibility == null ? "PG" : datcompatibility;
}
private String queryGaussdbVersion(QueryExecutor queryExecutor) throws SQLException, IOException {
byte[][] result = SetupQueryRunner.run(queryExecutor, "select version();", true);
String version = queryExecutor.getEncoding().decode(result[0]);

View File

@ -0,0 +1,38 @@
package org.postgresql.clusterhealthy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs;
import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getProperties;
public class ClusterHeartBeatFailureClusterTest {
@Before
public void initDirver() throws Exception {
TestUtil.initDriver();
}
@Test
public void runTest() {
ClusterHeartBeatFailureCluster clusterHeartBeatFailureCluster = ClusterHeartBeatFailureCluster.getInstance();
List<HostSpec> hostSpecs = getHostSpecs();
HostSpec master = hostSpecs.get(0);
hostSpecs.remove(master);
HashSet<Properties> set = new HashSet<>();
set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs()));
FailureCluster failureCluster = new FailureCluster(master, new HashSet<>(hostSpecs), set);
clusterHeartBeatFailureCluster.addFailureCluster(failureCluster);
System.out.println(clusterHeartBeatFailureCluster.getFailureCluster());
clusterHeartBeatFailureCluster.run();
System.out.println(clusterHeartBeatFailureCluster.getFailureCluster());
Assert.assertTrue(clusterHeartBeatFailureCluster.getFailureCluster().size() == 0);
}
}

View File

@ -0,0 +1,35 @@
package org.postgresql.clusterhealthy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs;
public class ClusterHeartBeatFailureMasterTest {
@Before
public void initDirver() throws Exception {
TestUtil.initDriver();
}
@Test
public void runTest() {
ClusterHeartBeatFailureMaster instance = ClusterHeartBeatFailureMaster.getInstance();
List<HostSpec> hostSpecs = getHostSpecs();
instance.addFailureMaster(hostSpecs.get(0), hostSpecs.get(1));
HashSet<Properties> set = new HashSet<>();
set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs()));
instance.addProperties(hostSpecs.get(0), set);
instance.addProperties(hostSpecs.get(1), set);
Assert.assertTrue(instance.getFailureMaster().size() != 0);
instance.run();
System.out.println(instance.getFailureMaster());
Assert.assertTrue(instance.getFailureMaster().size() == 0);
}
}

View File

@ -0,0 +1,40 @@
package org.postgresql.clusterhealthy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.postgresql.clusterhealthy.ClusterHeartBeatUtil.getHostSpecs;
public class ClusterHeartBeatMasterTest {
@Before
public void initDirver() throws Exception {
TestUtil.initDriver();
}
@Test
public void runTest() {
ClusterHeartBeatMaster clusterHeartBeatMaster = ClusterHeartBeatMaster.getInstance();
Map<HostSpec, Set<HostSpec>> clusterRelationship = clusterHeartBeatMaster.getClusterRelationship();
List<HostSpec> hostSpecs = getHostSpecs();
HostSpec master = hostSpecs.get(0);
hostSpecs.remove(master);
clusterRelationship.put(master, new HashSet<>(hostSpecs));
HashSet<Properties> set = new HashSet<>();
set.add(ClusterHeartBeatUtil.getProperties(getHostSpecs()));
clusterHeartBeatMaster.addProperties(master, set);
clusterHeartBeatMaster.run();
Assert.assertTrue(clusterRelationship.containsKey(master));
}
}

View File

@ -0,0 +1,66 @@
package org.postgresql.clusterhealthy;
import org.postgresql.util.HostSpec;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static java.util.stream.Collectors.joining;
public class ClusterHeartBeatUtil {
public static List<HostSpec> getHostSpecs() {
List<HostSpec> hostSpecList = new ArrayList<>();
hostSpecList.add(new HostSpec(System.getProperty("server"),
Integer.parseInt(System.getProperty("port"))));
hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"),
Integer.parseInt(System.getProperty("secondaryPort"))));
hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"),
Integer.parseInt(System.getProperty("secondaryServerPort2"))));
return hostSpecList;
}
public static String getUrl() {
List<String> list = new ArrayList<>();
list.add(System.getProperty("server") + ":" + System.getProperty("port"));
list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort"));
list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2"));
String serverAndPort = list.stream()
.collect(joining(","));
String database = getDatabase();
return String.format("jdbc:postgresql://%s/%s", serverAndPort, database);
}
public static String getDatabase() {
return System.getProperty("database");
}
public static String getUsername () {
return System.getProperty("username");
}
public static String getPassword() {
return System.getProperty("password");
}
public static Properties getProperties(List<HostSpec> hostSpecs) {
Properties properties = new Properties();
properties.put("user", getUsername());
properties.put("password", getPassword());
Properties info = new Properties(properties);
info.put("PGDBNAME", getDatabase());
info.put("PGPORT", hostSpecs.stream()
.map(o -> String.valueOf(o.getPort()))
.collect(joining(",")));
info.put("PGHOSTURL", hostSpecs.stream()
.map(HostSpec::getHost)
.collect(joining(",")));
info.put("PGHOST", hostSpecs.stream()
.map(HostSpec::getHost)
.collect(joining(",")));
info.put("PGPORTURL", hostSpecs.stream()
.map(o -> String.valueOf(o.getPort()))
.collect(joining(",")));
return info;
}
}

View File

@ -0,0 +1,155 @@
package org.postgresql.clusterhealthy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static java.util.stream.Collectors.joining;
import static org.postgresql.clusterhealthy.ClusterNodeCache.checkHostSpecs;
public class ClusterNodeCacheTest {
private List<HostSpec> getHostSpecs() {
List<HostSpec> hostSpecList = new ArrayList<>();
hostSpecList.add(new HostSpec(System.getProperty("server"),
Integer.parseInt(System.getProperty("port"))));
hostSpecList.add(new HostSpec(System.getProperty("secondaryServer"),
Integer.parseInt(System.getProperty("secondaryPort"))));
hostSpecList.add(new HostSpec(System.getProperty("secondaryServer2"),
Integer.parseInt(System.getProperty("secondaryServerPort2"))));
return hostSpecList;
}
private static String getUrl() {
List<String> list = new ArrayList<>();
list.add(System.getProperty("server") + ":" + System.getProperty("port"));
list.add(System.getProperty("secondaryServer") + ":" + System.getProperty("secondaryPort"));
list.add(System.getProperty("secondaryServer2") + ":" + System.getProperty("secondaryServerPort2"));
String serverAndPort = list.stream()
.collect(joining(","));
String database = getDatabase();
return String.format("jdbc:postgresql://%s/%s", serverAndPort, database);
}
private static String getDatabase() {
return System.getProperty("database");
}
private static String getUsername () {
return System.getProperty("username");
}
private static String getPassword() {
return System.getProperty("password");
}
private static Properties getProperties(List<HostSpec> hostSpecs) {
Properties properties = new Properties();
properties.put("user", getUsername());
properties.put("password", getPassword());
Properties info = new Properties(properties);
info.put("PGDBNAME", getDatabase());
info.put("PGPORT", hostSpecs.stream()
.map(o -> String.valueOf(o.getPort()))
.collect(joining(",")));
info.put("PGHOSTURL", hostSpecs.stream()
.map(HostSpec::getHost)
.collect(joining(",")));
info.put("PGHOST", hostSpecs.stream()
.map(HostSpec::getHost)
.collect(joining(",")));
info.put("PGPORTURL", hostSpecs.stream()
.map(o -> String.valueOf(o.getPort()))
.collect(joining(",")));
return info;
}
@Before
public void initDirver() throws Exception {
TestUtil.initDriver();
}
@Test
public void getClusterConnection() throws SQLException {
// Class.forName("org.postgresql.Driver");
DriverManager.getConnection(getUrl(), getUsername(), getPassword());
}
@Test
public void testPeriodTime() throws Exception {
String url = getUrl() + "?targetServerType=master";
DriverManager.getConnection(getUrl(), getUsername(), getPassword());
String newUrl = url + "&heartbeatPeriod=3000";
DriverManager.getConnection(newUrl, getUsername(), getPassword());
Thread.sleep(10000L);
}
@Test
public void testUnavailablePassword() throws ClassNotFoundException, SQLException, InterruptedException {
// Class.forName("org.postgresql.Driver");
String url = getUrl() + "?targetServerType=master";
DriverManager.getConnection(url, getUsername(), getPassword());
HostSpec master = new HostSpec(System.getProperty("server"),
Integer.parseInt(System.getProperty("port")));
ClusterHeartBeatMaster instance = ClusterHeartBeatMaster.getInstance();
Set<Properties> properties = instance.getProperties(master);
int before = properties.size();
// Change password
Thread.sleep(10000L);
Set<Properties> afterProperties = instance.getProperties(master);
int after = afterProperties.size();
Assert.assertNotEquals(after, before);
Thread.sleep(10000L);
}
@Test
public void testCheckReplacement() {
HostSpec master = new HostSpec(System.getProperty("server"),
Integer.parseInt(System.getProperty("port")));
Map<HostSpec, HostSpec> failureMap = ClusterHeartBeatFailureMaster.failureMap;
HostSpec node = new HostSpec("10.10.0.1", 2525);
failureMap.put(master, node);
List<HostSpec> hostSpecList = getHostSpecs();
HostSpec[] hostSpecs = hostSpecList.toArray(new HostSpec[0]);
ClusterNodeCache.checkReplacement(hostSpecs);
boolean containsMaster = false;
boolean containsNode = false;
for (HostSpec hostSpec : hostSpecs) {
if (hostSpec.equals(master)) {
containsMaster = true;
}
if (hostSpec.equals(node)) {
containsNode = true;
}
}
Assert.assertTrue(containsNode);
Assert.assertFalse(containsMaster);
}
@Test
public void testpushHostSpecs() throws Exception {
List<HostSpec> hostSpecs = getHostSpecs();
Properties info = getProperties(hostSpecs);
checkHostSpecs(hostSpecs.toArray(new HostSpec[0]), info);
System.out.println(ClusterNodeCache.isOpen());
Thread.sleep(30000);
}
}