jdbc实现leastconn最小连接模式和集群状态变化时的快速负载均衡

This commit is contained in:
congzhou2603
2023-02-14 14:52:40 +08:00
parent 803aed58bc
commit 7dfd142535
18 changed files with 3923 additions and 10 deletions

View File

@ -11,6 +11,8 @@ import org.postgresql.jdbc.PgConnection;
import org.postgresql.log.Logger;
import org.postgresql.log.Log;
import org.postgresql.log.Tracer;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.LoadBalanceHeartBeating;
import org.postgresql.util.DriverInfo;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
@ -558,8 +560,12 @@ public class Driver implements java.sql.Driver {
* @throws SQLException if the connection could not be made
*/
private static Connection makeConnection(String url, Properties props) throws SQLException {
ConnectionManager.getInstance().setCluster(props);
PgConnection pgConnection = new PgConnection(hostSpecs(props), user(props), database(props), props, url);
GlobalConnectionTracker.possessConnectionReference(pgConnection.getQueryExecutor(), props);
if (ConnectionManager.getInstance().setConnection(pgConnection, props)) {
LoadBalanceHeartBeating.startScheduledExecutorService(props);
}
return pgConnection;
}

View File

@ -473,6 +473,42 @@ public enum PGProperty {
+ "from that database. "
+ "(backend >= 9.4)"),
/**
* Enable quick auto balancing.
*
*/
ENABLE_QUICK_AUTO_BALANCE("enableQuickAutoBalance", "false",
"If the connection enable quickAutoBalance, this parameter only takes effect when autoBalance=leastconn."
+ "value: true or false.",
false, "true", "false"),
/**
* Idle time threshold of connections when quick auto balancing filters connections.
*/
MAX_IDLE_TIME_BEFORE_TERMINAL("maxIdleTimeBeforeTerminal", "30",
"During quick load balancing, if the connection is idle for more than maxIdleTimeBeforeTerminal seconds, "
+ "the connection may be closed to achieve load balancing for each data node."
+ "Value range: long && [0, Long.MAX_VALUE / 1000]."
+ "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"),
/**
* Percentage of min reserved connections pre cluster when executing quick auto balancing.
*/
MIN_RESERVED_CON_PER_CLUSTER("minReservedConPerCluster", null,
"Percentage of min reserved connections pre cluster when executing quick auto balancing, "
+ "jdbc will retain minReservedConPerCluster percent of the connections per cluster that meet the closing conditions during quick auto balancing."
+ "Value range: int && [0, 100]."
+ "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"),
/**
* Percentage of min reserved connections pre data node when executing quick auto balancing.
*/
MIN_RESERVED_CON_PER_DATANODE("minReservedConPerDatanode", null,
"Percentage of min reserved connections pre data node when executing quick auto balancing, "
+ "jdbc will retain minReservedConPerCluster percent of the connections pre data node that meet the closing conditions during quick auto balancing."
+ "Value range: int && [0, 100]."
+ "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"),
/**
* Supported TLS cipher suites
*/

View File

@ -20,6 +20,7 @@ import org.postgresql.core.Version;
import org.postgresql.hostchooser.*;
import org.postgresql.jdbc.SslMode;
import org.postgresql.QueryCNListUtils;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.util.*;
import org.postgresql.log.Logger;
import org.postgresql.log.Log;
@ -214,6 +215,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
HostChooserFactory.createHostChooser(currentHostSpecs, targetServerType, info);
Iterator<CandidateHost> hostIter = hostChooser.iterator();
boolean isMasterCluster = false;
boolean isFirstIter = true;
while (hostIter.hasNext()) {
CandidateHost candidateHost = hostIter.next();
HostSpec hostSpec = candidateHost.hostSpec;
@ -223,10 +225,16 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
// In that case, the system tries to connect to each host in order, thus it should not look into
// GlobalHostStatusTracker
HostStatus knownStatus = knownStates.get(hostSpec);
if (isFirstIter) {
isFirstIter = false;
} else {
ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpec, info);
}
if (knownStatus != null && !candidateHost.targetServerType.allowConnectingTo(knownStatus)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Known status of host " + hostSpec + " is " + knownStatus + ", and required status was " + candidateHost.targetServerType + ". Will try next host");
}
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
continue;
}
@ -308,6 +316,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
GlobalClusterStatusTracker.reportMasterCluster(info, clusterSpec);
} else {
queryExecutor.close();
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
break;
}
}
@ -322,6 +331,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
knownStates.put(hostSpec, hostStatus);
if (!candidateHost.targetServerType.allowConnectingTo(hostStatus)) {
queryExecutor.close();
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
continue;
}
@ -349,12 +359,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
if (hostIter.hasNext() || clusterIter.hasNext()) {
LOGGER.info("ConnectException occured while connecting to {0}" + hostSpec, cex);
exception.addSuppressed(cex);
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
// still more addresses to try
continue;
}
if (exception.getSuppressed().length > 0) {
cex.addSuppressed(exception);
}
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
throw new PSQLException(GT.tr(
"Connection to {0} refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.",
hostSpec), PSQLState.CONNECTION_UNABLE_TO_CONNECT, cex);
@ -365,12 +377,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
if (hostIter.hasNext() || clusterIter.hasNext()) {
LOGGER.info("IOException occured while connecting to " + hostSpec, ioe);
exception.addSuppressed(ioe);
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
// still more addresses to try
continue;
}
if (exception.getSuppressed().length > 0) {
ioe.addSuppressed(exception);
}
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
throw new PSQLException(GT.tr("The connection attempt failed."),
PSQLState.CONNECTION_UNABLE_TO_CONNECT, ioe);
} catch (SQLException se) {
@ -381,11 +395,13 @@ public class ConnectionFactoryImpl extends ConnectionFactory {
LOGGER.info("SQLException occured while connecting to " + hostSpec, se);
exception.addSuppressed(se);
// still more addresses to try
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
continue;
}
if (exception.getSuppressed().length > 0) {
se.addSuppressed(exception);
}
ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info);
throw se;
}
}

View File

@ -12,6 +12,8 @@ import org.postgresql.PGProperty;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.QueryCNListUtils;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.Cluster;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
@ -95,6 +97,7 @@ public class MultiHostChooser implements HostChooser {
allHosts = priorityRoundRobin(allHosts);
break;
case LeastConn:
allHosts = leastConn(allHosts);
break;
default:
isOutPutLog = false;
@ -108,6 +111,7 @@ public class MultiHostChooser implements HostChooser {
}
return allHosts;
}
// Returns a counter and increments it by one.
// Because it is possible to use it in multiple instances, use synchronized (MultiHostChooser.class).
private int getRRIndex() {
@ -138,6 +142,17 @@ public class MultiHostChooser implements HostChooser {
return result;
}
private List<HostSpec> leastConn(List<HostSpec> hostSpecs) {
if (hostSpecs.size() <= 1) {
return hostSpecs;
}
Cluster cluster = ConnectionManager.getInstance().getCluster(URLIdentifier);
if (cluster == null) {
return hostSpecs;
}
return cluster.sortDnsByLeastConn(hostSpecs);
}
/*
* Use for RR algorithm. In case of first CN is not been connected, jdbc will
* try to connect the second one. So shuffering all CN except of the first one

View File

@ -7,6 +7,8 @@ package org.postgresql.jdbc;
import org.postgresql.Driver;
import org.postgresql.core.*;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.LoadBalanceHeartBeating;
import org.postgresql.util.GT;
import org.postgresql.util.PGbytea;
import org.postgresql.util.PSQLException;
@ -22,7 +24,6 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import org.postgresql.core.v3.ConnectionFactoryImpl;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -1117,12 +1118,14 @@ public class PgStatement implements Statement, BaseStatement {
// Not in query, there's nothing to cancel
return;
}
setConnectionState(StatementCancelState.CANCELING);
// Synchronize on connection to avoid spinning in killTimerTask
synchronized (connection) {
try {
connection.cancelQuery();
} finally {
STATE_UPDATER.set(this, StatementCancelState.CANCELLED);
setConnectionState(StatementCancelState.CANCELLED);
connection.notifyAll(); // wake-up killTimerTask
}
}
@ -1166,14 +1169,14 @@ public class PgStatement implements Statement, BaseStatement {
fetchSize = rows;
}
private void startTimer() {
private void startTimer() throws SQLException {
/*
* there shouldn't be any previous timer active, but better safe than sorry.
*/
cleanupTimer();
STATE_UPDATER.set(this, StatementCancelState.IN_QUERY);
setConnectionState(StatementCancelState.IN_QUERY);
if (timeout == 0) {
return;
}
@ -1218,13 +1221,20 @@ public class PgStatement implements Statement, BaseStatement {
return true;
}
private void killTimerTask() {
private void setConnectionState(StatementCancelState state) throws SQLException {
if (LoadBalanceHeartBeating.isLoadBalanceHeartBeatingStarted() && this.connection instanceof PgConnection) {
ConnectionManager.getInstance().setConnectionState(this.connection.unwrap(PgConnection.class), state);
}
}
private void killTimerTask() throws SQLException {
boolean timerTaskIsClear = cleanupTimer();
// The order is important here: in case we need to wait for the cancel task, the state must be
// kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query.
// It is believed that this case is very rare, so "additional cancel and wait below" would not
// harm it.
if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) {
setConnectionState(StatementCancelState.IDLE);
return;
}
@ -1245,6 +1255,7 @@ public class PgStatement implements Statement, BaseStatement {
interrupted = true;
}
}
setConnectionState(StatementCancelState.IDLE);
}
if (interrupted) {
Thread.currentThread().interrupt();

View File

@ -8,7 +8,7 @@ package org.postgresql.jdbc;
/**
* Represents {@link PgStatement#cancel()} state.
*/
enum StatementCancelState {
public enum StatementCancelState {
IDLE,
IN_QUERY,
CANCELING,

View File

@ -0,0 +1,626 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.Driver;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.quickautobalance.DataNode.CheckDnStateResult;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A Cluster, which cached the information of the dataNodes in this cluster.
*/
public class Cluster {
private static Log LOGGER = Logger.getLogger(Cluster.class.getName());
private static final double CLOSE_CONNECTION_PERCENTAGE_EACH_TIME = 0.2d;
private final String urlIdentifier;
private final Set<HostSpec> dns;
private final Queue<ConnectionInfo> abandonedConnectionList;
private final Map<HostSpec, DataNode> cachedDnList;
private final List<Properties> cachedPropertiesList;
// Percentage of minimum reserved connections that can be closed in a cluster, value range: [0,100].
private volatile int minReservedConPerCluster;
private volatile boolean enableMinReservedConPerCluster;
// Percentage of minimum reserved connections that can be closed in a datanode, value range: [0,100].
private volatile int minReservedConPerDatanode;
private volatile boolean enableMinReservedConPerDatanode;
private volatile long quickAutoBalanceStartTime;
private int totalAbandonedConnectionSize = 0;
public Cluster(final String urlIdentifier, final Properties properties) throws PSQLException {
this.urlIdentifier = urlIdentifier;
HostSpec[] hostSpecs = Driver.getURLHostSpecs(properties);
this.dns = new HashSet<>();
this.dns.addAll(Arrays.asList(hostSpecs));
this.cachedDnList = new ConcurrentHashMap<>();
for (HostSpec hostSpec : hostSpecs) {
DataNode dataNode = new DataNode(hostSpec);
this.cachedDnList.put(hostSpec, dataNode);
}
updateParams(properties);
this.abandonedConnectionList = new ConcurrentLinkedQueue<>();
this.cachedPropertiesList = new Vector<>();
this.cachedPropertiesList.add(properties);
this.quickAutoBalanceStartTime = 0;
this.totalAbandonedConnectionSize = 0;
}
/**
* set connection state of cached connections if it exists.
*
* @param pgConnection pgConnection
* @param state new state
*/
public void setConnectionState(final PgConnection pgConnection, final StatementCancelState state) {
String socketAddress = pgConnection.getSocketAddress();
HostSpec hostSpec = calculateHostSpec(socketAddress);
if (hostSpec != null && !dns.contains(hostSpec)) {
return;
}
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode != null) {
dataNode.setConnectionState(pgConnection, state);
}
}
// calculate the hostSpec of destination
private HostSpec calculateHostSpec(String socketAddress) {
String urlClient = socketAddress.split("/")[1];
String[] urlClientSplit = urlClient.split(":");
if (urlClientSplit.length == 2) {
String host = urlClientSplit[0];
int port = Integer.parseInt(urlClientSplit[1]);
return new HostSpec(host, port);
} else {
return null;
}
}
/**
* Add a new connection.
*
* @param pgConnection pgConnection
* @param properties properties
*/
public void setConnection(final PgConnection pgConnection, final Properties properties)
throws PSQLException {
if (pgConnection == null || properties == null) {
return;
}
String socketAddress = pgConnection.getSocketAddress();
HostSpec hostSpec = calculateHostSpec(socketAddress);
if (hostSpec != null && !dns.contains(hostSpec)) {
return;
}
setProperties(properties);
synchronized (cachedDnList) {
cachedDnList.get(hostSpec).setConnection(pgConnection, properties, hostSpec);
decrementCachedCreatingConnectionSize(hostSpec);
updateParams(properties);
}
}
/**
* Set new properties if the user doesn't exist in cachedPropertiesList,
* or update properties if exists.
*
* @param properties properties
*/
public void setProperties(Properties properties) {
synchronized (cachedPropertiesList) {
for (int i = 0; i < cachedPropertiesList.size(); i++) {
Properties prop = cachedPropertiesList.get(i);
if (prop.getProperty("user", "").equals(properties.getProperty("user", null))) {
cachedPropertiesList.set(i, properties);
return;
}
}
cachedPropertiesList.add(properties);
}
}
/**
* CacheCreatingConnectionNum - 1;
*
* @param hostSpec hostSpec
* @return cachedCreatingConnectionNum
*/
public int decrementCachedCreatingConnectionSize(final HostSpec hostSpec) {
if (!cachedDnList.containsKey(hostSpec)) {
LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", hostSpec.toString(), urlIdentifier));
return 0;
}
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode != null) {
return dataNode.decrementCachedCreatingConnectionSize();
} else {
LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", hostSpec.toString(), urlIdentifier));
return 0;
}
}
private void updateMinReservedConPerCluster(Properties properties) throws PSQLException {
int perCluster = 0;
String param = PGProperty.MIN_RESERVED_CON_PER_CLUSTER.get(properties);
if (param == null) {
return;
}
try {
perCluster = Integer.parseInt(param);
} catch (NumberFormatException e) {
throw new PSQLException(GT.tr("Parameter minReservedConPerCluster={0} parsed failed, value range: int && [0, 100]."
, PGProperty.MIN_RESERVED_CON_PER_CLUSTER.get(properties)), PSQLState.INVALID_PARAMETER_TYPE);
}
if (perCluster < 0 || perCluster > 100) {
throw new PSQLException(GT.tr("Parameter minReservedConPerCluster={0} parsed failed, value range: int && [0, 100]."
, perCluster), PSQLState.INVALID_PARAMETER_VALUE);
} else {
if (!this.enableMinReservedConPerCluster) {
this.enableMinReservedConPerCluster = true;
this.minReservedConPerCluster = perCluster;
} else {
this.minReservedConPerCluster = Math.min(this.minReservedConPerCluster, perCluster);
}
}
}
private void updateMinReservedConPerDatanode(Properties properties) throws PSQLException {
int perDatanode = 0;
String param = PGProperty.MIN_RESERVED_CON_PER_DATANODE.get(properties);
if (param == null) {
return;
}
try {
perDatanode = Integer.parseInt(param);
} catch (NumberFormatException e) {
throw new PSQLException(GT.tr("Parameter minReservedConPerDatanode={0} parsed failed, value range: int && [0, 100]."
, PGProperty.MIN_RESERVED_CON_PER_DATANODE.get(properties)), PSQLState.INVALID_PARAMETER_TYPE);
}
if (perDatanode < 0 || perDatanode > 100) {
throw new PSQLException(GT.tr("Parameter minReservedConPerDatanode={0} parsed failed, value range: int && [0, 100].", perDatanode), PSQLState.INVALID_PARAMETER_VALUE);
} else {
if (!this.enableMinReservedConPerDatanode) {
this.enableMinReservedConPerDatanode = true;
this.minReservedConPerDatanode = perDatanode;
} else {
this.minReservedConPerDatanode = Math.min(this.minReservedConPerDatanode, perDatanode);
}
}
}
private void updateParams(Properties properties) throws PSQLException {
updateMinReservedConPerCluster(properties);
updateMinReservedConPerDatanode(properties);
}
/**
* Get connection info of the connection if exists.
*
* @param connection connection
* @return connection info
*/
public ConnectionInfo getConnectionInfo(PgConnection connection) {
String socketAddress = connection.getSocketAddress();
HostSpec hostSpec = calculateHostSpec(socketAddress);
DataNode dataNode = cachedDnList.get(hostSpec);
return hostSpec != null && dataNode != null ? dataNode.getConnectionInfo(connection) : null;
}
/**
* Sort hostSpec list in ascending order by amount of connections.
* Put the hostSpec to the tail, if dataNodeState = false.
*
* @param hostSpecs host specs
* @return hostSpec list
*/
public List<HostSpec> sortDnsByLeastConn(List<HostSpec> hostSpecs) {
Map<HostSpec, DataNodeCompareInfo> dataNodeCompareInfoMap;
// Copy dataNodeCompareInfo from cachedDnList.
synchronized (cachedDnList) {
dataNodeCompareInfoMap = hostSpecs.stream()
.collect(Collectors.toMap(Function.identity(), hostSpec -> {
DataNode dataNode = cachedDnList.get(hostSpec);
int cachedConnectionListSize = dataNode.getCachedConnectionListSize();
int cachedCreatingConnectionSize = dataNode.getCachedCreatingConnectionSize();
boolean dataNodeState = dataNode.getDataNodeState();
return new DataNodeCompareInfo(cachedConnectionListSize, cachedCreatingConnectionSize, dataNodeState);
}));
}
hostSpecs.sort((o1, o2) -> {
boolean o1State = dataNodeCompareInfoMap.get(o1).getDataNodeState();
boolean o2State = dataNodeCompareInfoMap.get(o2).getDataNodeState();
if (!o1State && o2State) {
return 1;
}
if (!o2State && o1State) {
return -1;
}
int o1ConnectionSize = dataNodeCompareInfoMap.get(o1).getConnectionListSize() + dataNodeCompareInfoMap.get(o1).getCachedCreatedConnectionSize();
int o2ConnectionSize = dataNodeCompareInfoMap.get(o2).getConnectionListSize() + dataNodeCompareInfoMap.get(o2).getCachedCreatedConnectionSize();
return o1ConnectionSize - o2ConnectionSize;
});
if (hostSpecs.get(0) != null) {
this.cachedDnList.get(hostSpecs.get(0)).incrementCachedCreatingConnectionSize();
}
LOGGER.info(GT.tr("SortDnsByLeastConn: {0}."
, dataNodeCompareInfoMap));
return hostSpecs;
}
/**
* The necessary info when sorting data nodes.
*/
class DataNodeCompareInfo {
int connectionListSize;
int cachedCreatedConnectionSize;
boolean dataNodeState;
public DataNodeCompareInfo(final int connectionListSize, final int cachedCreatedConnectionSize, final boolean dataNodeState) {
this.connectionListSize = connectionListSize;
this.cachedCreatedConnectionSize = cachedCreatedConnectionSize;
this.dataNodeState = dataNodeState;
}
/**
* Get connectionList size.
*
* @return size of connectionList
*/
public int getConnectionListSize() {
return connectionListSize;
}
/**
* Get cached created connection size.
*
* @return cachedCreateConnectionSize
*/
public int getCachedCreatedConnectionSize() {
return cachedCreatedConnectionSize;
}
/**
* Get data node state.
*
* @return dataNodeState
*/
public boolean getDataNodeState() {
return dataNodeState;
}
@Override
public String toString() {
return "{" +
"connectionListSize=" + connectionListSize +
", cachedCreatedConnectionSize=" + cachedCreatedConnectionSize +
", dataNodeState=" + dataNodeState +
'}';
}
}
/**
* Check each data nodes' validity of cluster,
* use cached properties to tryConnect to each data nodes,
* if cached properties is invalid, remove cached properties,
* if state change to valid from invalid, quick load balancing start,
* if state change to invalid from valid, clear cached connections of this node.
*
* @return the number of invalid data nodes
*/
public int checkClusterState() {
// Count the state of all data nodes before and after checking cluster state.
Map<HostSpec, Boolean> oldStates = cachedDnList.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey,
(val) -> val.getValue().getDataNodeState()));
Map<HostSpec, Boolean> newStates = cachedDnList.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey,
(val) -> this.checkDnState(val.getKey())));
Map<DataNodeChangedState, List<HostSpec>> checkResult = new HashMap<>();
for (DataNodeChangedState dataNodeChangedState : DataNodeChangedState.values()) {
checkResult.put(dataNodeChangedState, new ArrayList<>());
}
for (Map.Entry<HostSpec, DataNode> entry : cachedDnList.entrySet()) {
HostSpec hostSpec = entry.getKey();
boolean oldState = oldStates.get(hostSpec);
boolean newState = newStates.get(hostSpec);
if (oldState && !newState) {
// If data node fails, clear its cacheConnectionList.
int removed = cachedDnList.get(hostSpec).clearCachedConnections();
checkResult.get(DataNodeChangedState.CHANGE_TO_INVALID).add(hostSpec);
LOGGER.info(GT.tr("A data node failed, clear cached connections, cluster: {0}, " +
"hostSpec: {1}, cached connections: {2}.",
urlIdentifier, hostSpec.toString(), removed));
} else if (!oldState && newState) {
checkResult.get(DataNodeChangedState.CHANGE_TO_VALID).add(hostSpec);
} else if (oldState) {
checkResult.get(DataNodeChangedState.KEEP_VALID).add(hostSpec);
} else {
checkResult.get(DataNodeChangedState.KEEP_INVALID).add(hostSpec);
}
}
LOGGER.info(GT.tr("Check cluster states in cluster: {0}, result: {1}.",
urlIdentifier, checkResult.toString()));
// Start to quickLoadBalance.
if (checkResult.get(DataNodeChangedState.CHANGE_TO_VALID).size() != 0
&& LoadBalanceHeartBeating.isQuickAutoBalanceStarted()) {
quickLoadBalance(checkResult.get(DataNodeChangedState.KEEP_VALID));
}
return checkResult.get(DataNodeChangedState.KEEP_INVALID).size() +
checkResult.get(DataNodeChangedState.CHANGE_TO_INVALID).size();
}
enum DataNodeChangedState {
KEEP_VALID, KEEP_INVALID, CHANGE_TO_VALID, CHANGE_TO_INVALID
}
/**
* Check dn state by cached properties, and remove invalid properties.
*
* @param hostSpec hostSpec
* @return state of the date node
*/
public boolean checkDnState(HostSpec hostSpec) {
synchronized (cachedPropertiesList) {
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode == null) {
return false;
}
for (Iterator<Properties> iterator = cachedPropertiesList.iterator(); iterator.hasNext(); ) {
Properties properties = iterator.next();
CheckDnStateResult result = dataNode.checkDnStateAndProperties(properties);
if (CheckDnStateResult.DN_VALID.equals(result)) {
dataNode.setDataNodeState(true);
return true;
} else if (CheckDnStateResult.DN_INVALID.equals(result)) {
dataNode.setDataNodeState(false);
return false;
} else {
iterator.remove();
}
}
dataNode.setDataNodeState(false);
return false;
}
}
private int quickLoadBalance(List<HostSpec> validDns) {
synchronized (abandonedConnectionList) {
this.quickAutoBalanceStartTime = System.currentTimeMillis();
// the connections added into abandonedConnectionList
int removed = 0;
// cachedConnectionList size
int total = 0;
// idle connection filtered from cachedConnectionList
int idle = 0;
int minReservedConnectionPercentage;
if (!enableMinReservedConPerCluster) {
minReservedConnectionPercentage = minReservedConPerDatanode;
} else if (!enableMinReservedConPerDatanode) {
minReservedConnectionPercentage = minReservedConPerCluster;
} else {
minReservedConnectionPercentage = Math.max(minReservedConPerDatanode, minReservedConPerCluster);
}
for (Entry<HostSpec, DataNode> entry : cachedDnList.entrySet()) {
DataNode dataNode = entry.getValue();
if (dataNode != null) {
total += dataNode.getCachedConnectionListSize();
}
}
// Start to quickLoadBalance.
HashSet<ConnectionInfo> removedConnectionList = new HashSet<>();
for (HostSpec hostSpec : validDns) {
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode != null) {
List<ConnectionInfo> idleConnections = dataNode.filterIdleConnections(quickAutoBalanceStartTime);
idle += idleConnections.size();
int removedConnectionsSize = (int) (idleConnections.size() * (((double) (100 - minReservedConnectionPercentage)) / 100.0));
for (int i = 0; i < removedConnectionsSize; i++) {
removedConnectionList.add(idleConnections.get(i));
removed++;
}
}
}
this.abandonedConnectionList.clear();
this.abandonedConnectionList.addAll(removedConnectionList);
this.totalAbandonedConnectionSize = abandonedConnectionList.size();
LOGGER.info(GT.tr("QuickLoadBalancing executes in cluster: {0}, " +
"put {1} idle connections into abandonedConnectionList, connections can be closed: {2}, " +
"total connection: {3}, minReservedConPerCluster: {4}, minReservedConPerDatanode: {5}.",
urlIdentifier, removed, idle, total, this.minReservedConPerCluster, this.minReservedConPerDatanode));
return removed;
}
}
/**
* Check cached connections validity, and remove invalid connections.
*
* @return the amount of removed connections of each dn.
*/
public List<Integer> checkConnectionsValidity() {
List<Integer> ans = new ArrayList<>();
for (Entry<HostSpec, DataNode> entry : cachedDnList.entrySet()) {
DataNode dataNode = entry.getValue();
ans.add(dataNode.checkConnectionsValidity());
}
return ans;
}
/**
* CachedCreatingConnection + 1
*
* @param hostSpec hostSpec
* @return cachedCreatingConnection
*/
public int incrementCachedCreatingConnectionSize(final HostSpec hostSpec) {
if (!cachedDnList.containsKey(hostSpec)) {
LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.",
hostSpec.toString(), urlIdentifier));
return 0;
}
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode != null) {
return dataNode.incrementCachedCreatingConnectionSize();
} else {
LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.",
hostSpec.toString(), urlIdentifier));
return 0;
}
}
/**
* Get minReservedConPerCluster.
*
* @return minReservedConPerCluster
*/
public int getMinReservedConPerCluster() {
return minReservedConPerCluster;
}
/**
* Get enableMinReservedConPerCluster.
*
* @return enableMinReservedConPerCluster
*/
public boolean isEnableMinReservedConPerCluster() {
return enableMinReservedConPerCluster;
}
/**
* Get minReservedConPerDatanode.
*
* @return minReservedConPerDatanode
*/
public int getMinReservedConPerDatanode() {
return minReservedConPerDatanode;
}
/**
* Get enableMInReservedConPerDatanode.
*
* @return enableMInReservedConPerDatanode
*/
public boolean isEnableMinReservedConPerDatanode() {
return enableMinReservedConPerDatanode;
}
@Override
public int hashCode() {
return Objects.hash(urlIdentifier, dns, abandonedConnectionList, cachedDnList, cachedPropertiesList);
}
@Override
public boolean equals(final Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
final Cluster that = (Cluster) o;
return Objects.equals(urlIdentifier, that.urlIdentifier) && Objects.equals(dns, that.dns);
}
/**
* Close connections from abandonedConnectionList.
* Jdbc will continue to pop connections from abandonedConnectionList and determine whether each connection can be closed.
* If it does, the connection will be close. If it does not, the connection won't be put back into abandonedConnectionList.
* The number of connections closed at each cluster for each scheduled task at most: 20% * totalAbandonedConnectionSize.
*
* @return the number of connections which are closed
*/
public int closeConnections() {
int closed = 0;
double ceilError = 0.001;
int atMost = (int) (Math.ceil(CLOSE_CONNECTION_PERCENTAGE_EACH_TIME * totalAbandonedConnectionSize) + + ceilError);
synchronized (abandonedConnectionList) {
if (abandonedConnectionList.isEmpty()) {
return closed;
}
int oldSize = abandonedConnectionList.size();
while (!abandonedConnectionList.isEmpty() && closed < atMost) {
ConnectionInfo connectionInfo = abandonedConnectionList.poll();
HostSpec hostSpec = connectionInfo.getHostSpec();
// The connections shouldn't be null.
if (hostSpec == null) {
continue;
}
// The connections should be valid.
if (!connectionInfo.checkConnectionIsValid()) {
continue;
}
// The state of connection may change after put into abandonedConnectionList,
// so it's necessary to recheck it which can be closed.
if (!connectionInfo.checkConnectionCanBeClosed(quickAutoBalanceStartTime)) {
continue;
}
DataNode dataNode = cachedDnList.get(hostSpec);
if (dataNode == null) {
continue;
}
boolean hasClosed = dataNode.closeConnection(connectionInfo.getPgConnection());
if (hasClosed) {
closed++;
}
}
if (abandonedConnectionList.isEmpty()) {
this.quickAutoBalanceStartTime = 0;
this.totalAbandonedConnectionSize = 0;
}
LOGGER.info(GT.tr("Close connections execute in cluster: {0}, closed connections: {1}, " +
"size of abandonedConnectionList before closing: {2}," +
" size of abandonedConnectionList after closing: {3}.",
urlIdentifier, closed, oldSize, abandonedConnectionList.size()));
}
return closed;
}
}

View File

@ -0,0 +1,246 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.PGProperty;
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.SetupQueryRunner;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Objects;
import java.util.Properties;
/**
* Connection info used in quick auto balance.
*/
public class ConnectionInfo {
/**
* Default maxIdleTimeBeforeTerminal.
*/
public static final long DEFAULT_MAX_IDLE_TIME_BEFORE_TERMINAL = 30L;
public static final String ENABLE_QUICK_AUTO_BALANCE_PARAMS = "true";
private static Log LOGGER = Logger.getLogger(ConnectionInfo.class.getName());
private final PgConnection pgConnection;
private final long createTimeStamp;
private final String autoBalance;
private boolean enableQuickAutoBalance;
// max idle time of connection, units: second
private long maxIdleTimeBeforeTerminal;
private final HostSpec hostSpec;
private volatile StatementCancelState connectionState;
// the timestamp when state change last time
private volatile long stateLastChangedTimeStamp;
@Override
public int hashCode() {
return Objects.hash(pgConnection, createTimeStamp, autoBalance, enableQuickAutoBalance,
maxIdleTimeBeforeTerminal, hostSpec);
}
public ConnectionInfo(PgConnection pgConnection, Properties properties, HostSpec hostSpec)
throws PSQLException {
this.pgConnection = pgConnection;
this.connectionState = StatementCancelState.IDLE;
this.createTimeStamp = System.currentTimeMillis();
this.stateLastChangedTimeStamp = createTimeStamp;
this.autoBalance = properties.getProperty("autoBalance", "");
this.maxIdleTimeBeforeTerminal = DEFAULT_MAX_IDLE_TIME_BEFORE_TERMINAL;
this.hostSpec = hostSpec;
parseEnableQuickAutoBalance(properties);
parseMaxIdleTimeBeforeTerminal(properties);
}
private void parseEnableQuickAutoBalance(Properties properties) throws PSQLException {
if (EnableQuickAutoBalanceParams.TRUE.getValue()
.equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) {
this.enableQuickAutoBalance = true;
} else if (EnableQuickAutoBalanceParams.FALSE.getValue()
.equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) {
this.enableQuickAutoBalance = false;
} else {
throw new PSQLException(
GT.tr("Parameter enableQuickAutoBalance={0} parsed failed, value range: '{true, false'}).",
PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties)), PSQLState.INVALID_PARAMETER_VALUE);
}
}
private void parseMaxIdleTimeBeforeTerminal(Properties properties) throws PSQLException {
long inputMaxIdleTime;
try {
String param = PGProperty.MAX_IDLE_TIME_BEFORE_TERMINAL.get(properties);
inputMaxIdleTime = Long.parseLong(param);
if (inputMaxIdleTime >= Long.MAX_VALUE / 1000) {
throw new PSQLException(
GT.tr("Parameter maxIdleTimeBeforeTerminal={0} can not be bigger than {1}, value range: long & [0,{1}).",
inputMaxIdleTime, Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_VALUE);
}
if (inputMaxIdleTime < 0) {
throw new PSQLException(
GT.tr("Parameter maxIdleTimeBeforeTerminal={0} can not be less than 0, value range: long & [0,{1}).",
inputMaxIdleTime, Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_VALUE);
}
} catch (NumberFormatException e) {
throw new PSQLException(
GT.tr("Parameter maxIdleTimeBeforeTerminal parsed failed, value range: long & [0,{0}).",
Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_TYPE);
}
this.maxIdleTimeBeforeTerminal = inputMaxIdleTime;
}
enum EnableQuickAutoBalanceParams {
TRUE("true"),
FALSE("false");
private final String value;
EnableQuickAutoBalanceParams(String value) {
this.value = value;
}
/**
* Get value.
*
* @return value
*/
public String getValue() {
return this.value;
}
}
public StatementCancelState getConnectionState() {
return connectionState;
}
public synchronized void setConnectionState(StatementCancelState state) {
if (state != null && !connectionState.equals(state)) {
connectionState = state;
stateLastChangedTimeStamp = System.currentTimeMillis();
}
}
public long getMaxIdleTimeBeforeTerminal() {
return maxIdleTimeBeforeTerminal;
}
public String getAutoBalance() {
return autoBalance;
}
public PgConnection getPgConnection() {
return pgConnection;
}
@Override
public boolean equals(final Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
final ConnectionInfo that = (ConnectionInfo) o;
return createTimeStamp == that.createTimeStamp && enableQuickAutoBalance == that.enableQuickAutoBalance &&
maxIdleTimeBeforeTerminal == that.maxIdleTimeBeforeTerminal && pgConnection.equals(that.pgConnection) &&
autoBalance.equals(that.autoBalance) && hostSpec.equals(that.hostSpec);
}
/**
* Check whether the connection can be closed.
* The judgement conditions are as follows:
* 1. The connection enables quickAutoBalance.
* 2. The quickAutoBalance start time is later than the connection create time.
* 3. The connection state is idle.
* 4. The connection keeps idle at least maxIdleTimeBeforeTerminal seconds.
*
* @param quickAutoBalanceStartTime quickAutoBalance start time
* @return whether the connection can be closed
*/
public synchronized boolean checkConnectionCanBeClosed(long quickAutoBalanceStartTime) {
if (pgConnection == null) {
return false;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(GT.tr("checkConnectionCanBeClosed: server ip={0}, enableQuickAutoBalance={1}, " +
"quickAutoBalanceStartTime={2}, createTimeStamp={3}, connectionState={4}, " +
"stateLastChangedTimeStamp={5}, currentTimeMillis={6}",
hostSpec.toString(), isEnableQuickAutoBalance(), quickAutoBalanceStartTime, createTimeStamp,
connectionState, stateLastChangedTimeStamp, System.currentTimeMillis()));
}
if (!isEnableQuickAutoBalance()) {
return false;
}
if (quickAutoBalanceStartTime < createTimeStamp) {
return false;
}
if (!connectionState.equals(StatementCancelState.IDLE)) {
return false;
}
return System.currentTimeMillis() - stateLastChangedTimeStamp > maxIdleTimeBeforeTerminal * 1000;
}
public boolean isEnableQuickAutoBalance() {
return enableQuickAutoBalance;
}
/**
* Check whether a connection is valid.
*
* @return whether a connection is valid
*/
public boolean checkConnectionIsValid() {
boolean isConnectionValid;
try {
QueryExecutor queryExecutor = pgConnection.getQueryExecutor();
byte[][] bit = SetupQueryRunner.run(queryExecutor, "select 1", true);
if (bit == null) {
return false;
}
String result = queryExecutor.getEncoding().decode(bit[0]);
isConnectionValid = result != null && result.equals("1");
} catch (SQLException | IOException e) {
LOGGER.info(GT.tr("Check connection isValid failed."));
isConnectionValid = false;
}
return isConnectionValid;
}
/**
* get hostSpec
*
* @return hostSpec
*/
public HostSpec getHostSpec() {
return hostSpec;
}
}

View File

@ -0,0 +1,295 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.Driver;
import org.postgresql.QueryCNListUtils;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* Connection manager of quick load balancing.
*/
public class ConnectionManager {
private static final Log LOGGER = Logger.getLogger(ConnectionManager.class.getName());
private static final String AUTO_BALANCE = "autoBalance";
private static final String LEAST_CONN = "leastconn";
private final Map<String, Cluster> cachedClusters;
private ConnectionManager() {
this.cachedClusters = new ConcurrentHashMap<>();
}
/**
* Choose abandoned connections from abandonedConnections of each cluster.
* The number of connections that each cluster closes : CLOSE_CONNECTION_PERIOD * CLOSE_CONNECTION_PER_SECOND.
*
* @return the number of connections closed per cluster
*/
public List<Integer> closeConnections() {
List<Integer> ans = new ArrayList<>();
for (Entry<String, Cluster> entry : cachedClusters.entrySet()) {
Cluster cluster = entry.getValue();
int num = 0;
num += cluster != null ? cluster.closeConnections() : 0;
ans.add(num);
}
return ans;
}
/**
* check whether the properties enable leastconn.
*
* @param properties properties
* @return whether the properties enable leastconn.
*/
public static boolean checkEnableLeastConn(Properties properties) {
if (properties == null) {
return false;
}
if (!LEAST_CONN.equals(properties.getProperty(AUTO_BALANCE, ""))) {
return false;
}
HostSpec[] hostSpecs = Driver.getURLHostSpecs(properties);
if (hostSpecs.length <= 1) {
return false;
}
return true;
}
private static class Holder {
private static final ConnectionManager INSTANCE = new ConnectionManager();
}
public static ConnectionManager getInstance() {
return Holder.INSTANCE;
}
/**
* Set cluster into connection manager.
*
* @param properties properties
* @return set or not.
*/
public boolean setCluster(Properties properties) throws PSQLException {
if (!checkEnableLeastConn(properties)) {
return false;
}
String urlIdentifier = QueryCNListUtils.keyFromURL(properties);
// create a cluster if it doesn't exist in cachedClusters.
if (!cachedClusters.containsKey(urlIdentifier)) {
synchronized (cachedClusters) {
if (!cachedClusters.containsKey(urlIdentifier)) {
Cluster cluster = new Cluster(urlIdentifier, properties);
cachedClusters.put(urlIdentifier, cluster);
return true;
} else {
return false;
}
}
} else {
return false;
}
}
/**
* Cache this connection, if it's configured with autoBalance = "leastconn".
*
* @param pgConnection connection
* @param properties properties
* @return if insert the connection into connectionManager.
*/
public boolean setConnection(PgConnection pgConnection, Properties properties) throws PSQLException {
if (!checkEnableLeastConn(properties)) {
return false;
}
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
// create a cluster if it doesn't exist in cachedClusters.
if (!cachedClusters.containsKey(URLIdentifier)) {
synchronized (cachedClusters) {
if (!cachedClusters.containsKey(URLIdentifier)) {
Cluster cluster = new Cluster(URLIdentifier, properties);
cluster.setConnection(pgConnection, properties);
cachedClusters.put(URLIdentifier, cluster);
}
}
} else {
cachedClusters.get(URLIdentifier).setConnection(pgConnection, properties);
}
return true;
}
/**
* Set connection state of cached connections if it exists.
*
* @param pgConnection pgConnection
* @param state state
*/
public void setConnectionState(PgConnection pgConnection, StatementCancelState state) throws PSQLException {
String url;
try {
url = pgConnection.getURL();
} catch (SQLException e) {
LOGGER.error(GT.tr("Can't get url from pgConnection."));
return;
}
String URLIdentifier = getURLIdentifierFromUrl(url);
Cluster cluster = cachedClusters.get(URLIdentifier);
if (cluster != null) {
cluster.setConnectionState(pgConnection, state);
}
}
public Cluster getCluster(String URLIdentifier) {
return cachedClusters.get(URLIdentifier);
}
/**
* Get URLIdentifier from url, which is a unique id of cluster.
*
* @param url url
* @return URLIdentifier
*/
public static String getURLIdentifierFromUrl(String url) throws PSQLException {
HostSpec[] hostSpecs;
try {
String pgHostUrl = url.split("//")[1].split("/")[0];
String[] pgHosts = pgHostUrl.split(",");
hostSpecs = new HostSpec[pgHosts.length];
for (int i = 0; i < hostSpecs.length; i++) {
hostSpecs[i] = new HostSpec(pgHosts[i].split(":")[0],
Integer.parseInt(pgHosts[i].split(":")[1]));
}
Arrays.sort(hostSpecs);
} catch (ArrayIndexOutOfBoundsException e) {
throw new PSQLException(
GT.tr("Parsed url={0} failed.", url), PSQLState.INVALID_PARAMETER_VALUE);
}
return Arrays.toString(hostSpecs);
}
/**
* Check whether the connections are valid in each cluster, and remove invalid connections.
*
* @return the number of connections removed from cache.
*/
public List<Integer> checkConnectionsValidity() {
List<Integer> ans = new ArrayList<>();
for (Entry<String, Cluster> entry : cachedClusters.entrySet()) {
Cluster cluster = entry.getValue();
int num = 0;
if (cluster != null) {
List<Integer> removes = cluster.checkConnectionsValidity();
for (int remove : removes) {
num += remove;
}
}
ans.add(num);
}
return ans;
}
/**
* Check datanode states of each cluster.
*
* @return the number of invalid data nodes of all cluster
*/
public int checkClusterStates() {
int invalidDataNodes = 0;
for (Entry<String, Cluster> entry : cachedClusters.entrySet()) {
Cluster cluster = entry.getValue();
if (cluster != null) {
invalidDataNodes += cluster.checkClusterState();
}
}
return invalidDataNodes;
}
/**
* Increment cachedCreatingConnectionSize.
* CachedCreatingConnectionSize indicates the number of connections that have been load balanced,
* but haven't been cached into cachedConnectionList yet.
*
* @param hostSpec hostSpec
* @param properties properties
* @return cachedCreatingConnectionSize after updating
*/
public int incrementCachedCreatingConnectionSize(HostSpec hostSpec, Properties properties) {
if (!checkEnableLeastConn(properties)) {
return 0;
}
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
if (cachedClusters.containsKey(URLIdentifier)) {
Cluster cluster = cachedClusters.get(URLIdentifier);
if (cluster != null) {
return cluster.incrementCachedCreatingConnectionSize(hostSpec);
}
} else {
LOGGER.info(GT.tr("Can not find cluster: {0} in cached clusters.", URLIdentifier));
}
return 0;
}
/**
* Decrement cachedCreatingConnectionSize.
* CachedCreatingConnectionSize indicates the number of connections that have been load balanced,
* but haven't been cached into cachedConnectionList yet.
*
* @param hostSpec hostSpec
* @param properties properties
* @return cachedCreatingConnectionSize after updating
*/
public int decrementCachedCreatingConnectionSize(HostSpec hostSpec, Properties properties) {
if (!checkEnableLeastConn(properties)) {
return 0;
}
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
if (cachedClusters.containsKey(URLIdentifier)) {
Cluster cluster = cachedClusters.get(URLIdentifier);
if (cluster != null) {
return cluster.decrementCachedCreatingConnectionSize(hostSpec);
}
} else {
LOGGER.info(GT.tr("Can not find cluster: {0} in cached clusters.", URLIdentifier));
}
return 0;
}
public void clear() {
synchronized (cachedClusters) {
cachedClusters.clear();
}
}
}

View File

@ -0,0 +1,361 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.PGProperty;
import org.postgresql.core.PGStream;
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.v3.ConnectionFactoryImpl;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.SslMode;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import javax.net.SocketFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Datanode.
*/
public class DataNode {
private static Log LOGGER = Logger.getLogger(DataNode.class.getName());
private static final String USERNAME_OR_PASSWORD_INVALID_ERROR_CODE = "28P01";
// the host of datanode (ip + port)
private final HostSpec hostSpec;
// cached connections
private final Map<PgConnection, ConnectionInfo> cachedConnectionList;
// number of cached connections, before set into cachedConnectionList, and after load balance by leastconn.
private final AtomicInteger cachedCreatingConnectionSize;
private volatile boolean dataNodeState;
public DataNode(final HostSpec hostSpec) {
this.hostSpec = hostSpec;
this.cachedConnectionList = new ConcurrentHashMap<>();
this.cachedCreatingConnectionSize = new AtomicInteger(0);
this.dataNodeState = true;
}
/**
* Set connection state.
*
* @param pgConnection pgConnection
* @param state state
*/
public void setConnectionState(final PgConnection pgConnection, final StatementCancelState state) {
ConnectionInfo connectionInfo = cachedConnectionList.get(pgConnection);
if (connectionInfo != null) {
connectionInfo.setConnectionState(state);
}
}
/**
* Set connection.
*
* @param pgConnection pgConnection
* @param properties properties
* @param hostSpec hostSpec
*/
public void setConnection(final PgConnection pgConnection, final Properties properties, final HostSpec hostSpec)
throws PSQLException {
if (pgConnection == null || properties == null || hostSpec == null) {
return;
}
if (!hostSpec.equals(this.hostSpec)) {
return;
}
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
cachedConnectionList.put(pgConnection, connectionInfo);
}
/**
* Get connection info if the connection exits.
*
* @param pgConnection pgConnection
* @return connectionInfo
*/
public ConnectionInfo getConnectionInfo(PgConnection pgConnection) {
if (pgConnection == null) {
return null;
}
return cachedConnectionList.get(pgConnection);
}
/**
* Get the size of cachedConnectionList.
*
* @return size of cachedConnectionList
*/
public int getCachedConnectionListSize() {
return cachedConnectionList.size();
}
/**
* Check dn state and the validity of properties.
*
* @param properties properties
* @return result (dnValid, dnInvalid, propertiesInvalid)
*/
public CheckDnStateResult checkDnStateAndProperties(Properties properties) {
boolean isDataNodeValid;
Properties singleNodeProperties = new Properties();
PGProperty.USER.set(singleNodeProperties, PGProperty.USER.get(properties));
PGProperty.PASSWORD.set(singleNodeProperties, PGProperty.PASSWORD.get(properties));
PGProperty.PG_DBNAME.set(singleNodeProperties, PGProperty.PG_DBNAME.get(properties));
PGProperty.PG_HOST.set(singleNodeProperties, hostSpec.getHost());
PGProperty.PG_PORT.set(singleNodeProperties, hostSpec.getPort());
try {
isDataNodeValid = checkDnState(singleNodeProperties);
} catch (PSQLException e) {
String cause = e.getCause() != null ? e.getCause().getMessage() : "";
LOGGER.info(GT.tr("Can not try connect to dn: {0}, {1}.", hostSpec.toString(), cause.toString()));
return CheckDnStateResult.DN_INVALID;
} catch (InvocationTargetException e) {
Throwable invocationTargetExceptionCause = e.getCause();
if (invocationTargetExceptionCause instanceof PSQLException) {
PSQLException psqlException = (PSQLException) invocationTargetExceptionCause;
String sqlState = psqlException.getSQLState();
if (USERNAME_OR_PASSWORD_INVALID_ERROR_CODE.equals(sqlState)) {
String cause = e.getCause() != null ? e.getCause().getMessage() : "";
LOGGER.info(GT.tr("Cached properties is invalid: {0}.", cause.toString()));
return CheckDnStateResult.PROPERTIES_INVALID;
}
}
String cause = e.getCause() != null ? e.getCause().getMessage() : "";
LOGGER.info(GT.tr("Can not try connect to dn: {0}, {1}.", hostSpec.toString(), cause.toString()));
return CheckDnStateResult.DN_INVALID;
}
if (isDataNodeValid) {
return CheckDnStateResult.DN_VALID;
} else {
return CheckDnStateResult.DN_INVALID;
}
}
/**
* Filter idle connections from cachedConnectionsList.
*
* @param quickAutoBalanceStartTime the time since the start of quickAutoBalance
* @return idle Connection list
*/
public List<ConnectionInfo> filterIdleConnections(final long quickAutoBalanceStartTime) {
synchronized (cachedConnectionList) {
List<ConnectionInfo> idleConnectionList = new ArrayList<>();
for (Entry<PgConnection, ConnectionInfo> entry : cachedConnectionList.entrySet()) {
ConnectionInfo connectionInfo = entry.getValue();
if (connectionInfo != null && connectionInfo.checkConnectionCanBeClosed(quickAutoBalanceStartTime)) {
idleConnectionList.add(connectionInfo);
}
}
return idleConnectionList;
}
}
/**
* The result of checking dn state.
*/
public enum CheckDnStateResult {
DN_VALID,
DN_INVALID,
PROPERTIES_INVALID
}
public void setDataNodeState(boolean isDnValid) {
this.dataNodeState = isDnValid;
}
public boolean getDataNodeState() {
return this.dataNodeState;
}
/**
* check a dn of the cluster if valid,
*
* @param properties properties
* @return if the dn is valid.
* @throws PSQLException psql exception
* @throws InvocationTargetException invocation target exception
*/
public boolean checkDnState(Properties properties) throws PSQLException, InvocationTargetException {
Object pgStream = new Object();
try {
HostSpec dnHostSpec = new HostSpec(properties.getProperty("PGHOST")
, Integer.parseInt(properties.getProperty("PGPORT")));
SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(properties);
SslMode sslMode = SslMode.of(properties);
Class<?> classForName = Class.forName("org.postgresql.core.v3.ConnectionFactoryImpl");
Object object = classForName.newInstance();
if (!(object instanceof ConnectionFactoryImpl)) {
LOGGER.error(GT.tr("classForName.newInstance() doesn't instanceof ConnectionFactoryImpl."));
return false;
}
ConnectionFactoryImpl connectionFactory = (ConnectionFactoryImpl) object;
Method method = connectionFactory.getClass().getDeclaredMethod("tryConnect", String.class,
String.class, Properties.class, SocketFactory.class, HostSpec.class, SslMode.class);
method.setAccessible(true);
pgStream = method.invoke(connectionFactory, properties.getProperty("user"),
properties.getProperty("PGDBNAME"), properties, socketFactory, dnHostSpec, sslMode);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException |
ClassNotFoundException e) {
throw new PSQLException("The queryExecutor of connection can't execute tryConnect",
PSQLState.WRONG_OBJECT_TYPE);
}
if (pgStream instanceof PGStream) {
return true;
} else {
LOGGER.error(GT.tr("Stream doesn't instanceof PGStream."));
return false;
}
}
/**
* Check cached connections validity, and remove invalid connections.
*
* @return the amount of removed connections.
*/
public int checkConnectionsValidity() {
int num = 0;
for (Entry<PgConnection, ConnectionInfo> entry : cachedConnectionList.entrySet()) {
PgConnection pgConnection = entry.getKey();
ConnectionInfo connectionInfo = entry.getValue();
if (!connectionInfo.checkConnectionIsValid()) {
cachedConnectionList.remove(pgConnection);
num++;
}
}
return num;
}
/**
* Close cached connections, and clear cachedConnectionList.
* JDBC execute clearCachedConnections when jdbc find an invalid datanode.
*
* @return size of cachedConnectionList before cleared
*/
public int clearCachedConnections() {
synchronized (cachedConnectionList) {
int num = cachedConnectionList.size();
for (Map.Entry<PgConnection, ConnectionInfo> entry : cachedConnectionList.entrySet()) {
PgConnection pgConnection = entry.getKey();
if (pgConnection != null) {
QueryExecutor queryExecutor = pgConnection.getQueryExecutor();
if (queryExecutor != null && !queryExecutor.isClosed()) {
queryExecutor.setAvailability(false);
}
} else {
LOGGER.error(GT.tr("Fail to close connection, pgConnection = null."));
}
}
cachedConnectionList.clear();
return num;
}
}
/**
* Close connection.
*
* @param pgConnection pgConnection
* @return if closed
*/
public boolean closeConnection(PgConnection pgConnection) {
if (pgConnection == null) {
return false;
}
ConnectionInfo connectionInfo = cachedConnectionList.remove(pgConnection);
if (connectionInfo != null) {
try {
pgConnection.close();
return true;
} catch (SQLException e) {
LOGGER.info(GT.tr("Connection closed failed."), e);
return false;
}
}
return false;
}
/**
* get cachedCreatingConnectionSize
*
* @return cachedCreatingConnectionSize
*/
public int getCachedCreatingConnectionSize() {
return cachedCreatingConnectionSize.get();
}
/**
* increment cachedCreatingConnectionSize
*
* @return cachedCreatingConnectionSize after updated
*/
public int incrementCachedCreatingConnectionSize() {
return cachedCreatingConnectionSize.incrementAndGet();
}
/**
* decrement cachedCreatingConnectionSize
*
* @return cachedCreatingConnectionSize after updated
*/
public int decrementCachedCreatingConnectionSize() {
if (cachedCreatingConnectionSize.get() == 0) {
// Some of junit tests don't load balance, but setConnection, can generate this error.
LOGGER.error(GT.tr("CachedCreatingConnectionSize should not be less than 0, reset to 0."));
return 0;
}
return cachedCreatingConnectionSize.decrementAndGet();
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final DataNode dataNode = (DataNode) obj;
return dataNodeState == dataNode.dataNodeState && Objects.equals(hostSpec, dataNode.hostSpec) &&
Objects.equals(cachedConnectionList, dataNode.cachedConnectionList);
}
@Override
public int hashCode() {
return Objects.hash(hostSpec);
}
}

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.PGProperty;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Load balance heartBeating.
*/
public class LoadBalanceHeartBeating {
private static final int INITIAL_DELAY = 1000;
private static final int CHECK_CLUSTER_STATE_PERIOD = 1000 * 20;
// unit: CLOSE_CONNECTION_PER_SECOND
private static final int CLOSE_CONNECTION_PERIOD = 1000 * 5;
// A heartBeating thread used to check clusters' state.
// If user has configured 'autoBalance=leastconn', jdbc will start checkConnectionScheduledExecutorService.
private static final ScheduledExecutorService checkClusterStateScheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "loadBalanceHeartBeatingThread"));
// A heartBeating thread used to close abandoned connections.
// If user has configured 'autoBalance=leastconn&enableQuickAutoBalance=true', jdbc will start closeConnectionExecutorService.
private static final ScheduledExecutorService closeConnectionExecutorService = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "closeConnectionsHeartBeatingThread"));
private static Log LOGGER = Logger.getLogger(LoadBalanceHeartBeating.class.getName());
private static volatile ScheduledFuture<?> checkClusterStateScheduledFuture = null;
private static volatile ScheduledFuture<?> closeConnectionScheduledFuture = null;
// If singleton checkConnectionScheduledExecutorService has started.
private static volatile boolean leastConnStarted = false;
// If singleton closeConnectionExecutorService has started.
private static volatile boolean quickAutoBalanceStarted = false;
/**
* Whether quickAutoBalance has started.
*
* @return whether quickAutoBalance has started
*/
public static boolean isLoadBalanceHeartBeatingStarted() {
return leastConnStarted && quickAutoBalanceStarted;
}
/**
* Get if quickAutoBalance has started.
*
* @return if quickAutoBalance has started
*/
public static boolean isQuickAutoBalanceStarted() {
return quickAutoBalanceStarted;
}
/**
* Get if leastConnStarted has started.
*
* @return if leastConnStarted has started
*/
public static boolean isLeastConnStarted() {
return leastConnStarted;
}
/**
* Start scheduled executor service. There are two singleton scheduled executor service.
* If user has configured 'autoBalance=leastconn', jdbc will start checkConnectionScheduledExecutorService.
* If user has configured 'autoBalance=leastconn&enableQuickAutoBalance=true', jdbc will start closeConnectionExecutorService.
*
* @param properties properties
*/
public static void startScheduledExecutorService(Properties properties) {
if (!leastConnStarted) {
if (ConnectionManager.checkEnableLeastConn(properties)) {
synchronized (LoadBalanceHeartBeating.class) {
if (!leastConnStarted) {
leastConnStarted = true;
checkClusterStateScheduledFuture = checkClusterStateScheduledExecutorService
.scheduleAtFixedRate(LoadBalanceHeartBeating::checkClusterStateScheduleTask,
INITIAL_DELAY, CHECK_CLUSTER_STATE_PERIOD, TimeUnit.MILLISECONDS);
LOGGER.info(GT.tr("Start scheduleExecutorService, period:{0} milliseconds.",
CHECK_CLUSTER_STATE_PERIOD));
}
}
}
}
if (!quickAutoBalanceStarted) {
if (ConnectionManager.checkEnableLeastConn(properties)
&& ConnectionInfo.ENABLE_QUICK_AUTO_BALANCE_PARAMS.equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) {
synchronized (LoadBalanceHeartBeating.class) {
if (!quickAutoBalanceStarted) {
quickAutoBalanceStarted = true;
closeConnectionScheduledFuture = closeConnectionExecutorService
.scheduleAtFixedRate(LoadBalanceHeartBeating::closeAbandonedConnections,
INITIAL_DELAY, CLOSE_CONNECTION_PERIOD, TimeUnit.MILLISECONDS);
LOGGER.info(GT.tr("Start closeConnectionScheduledFuture, period:{0} milliseconds.",
CLOSE_CONNECTION_PERIOD));
}
}
}
}
}
private static void checkClusterStateScheduleTask() {
checkClusterState();
checkConnectionValidity();
}
private static void closeAbandonedConnections() {
List<Integer> closedConnections = ConnectionManager.getInstance().closeConnections();
int sum = closedConnections.stream().mapToInt(Integer::intValue).sum();
LOGGER.info(GT.tr("Scheduled task: closeAbandonedConnections(), thread id: {0}, " +
"amount of closed connections: {1}.", Thread.currentThread().getId(), sum));
}
private static void checkClusterState() {
int invalidDataNodes = ConnectionManager.getInstance().checkClusterStates();
LOGGER.info(GT.tr("Scheduled task: checkClusterState(), thread id: {0}, " +
"amount of invalid data nodes: {1}.", Thread.currentThread().getId(), invalidDataNodes));
}
private static void checkConnectionValidity() {
List<Integer> removes = ConnectionManager.getInstance().checkConnectionsValidity();
int sum = removes.stream().mapToInt(Integer::intValue).sum();
LOGGER.info(GT.tr("Scheduled task: checkConnectionValidity(), thread id: {0}, " +
"amount of removed connections: {1}.", Thread.currentThread().getId(), sum));
}
/**
* Stop scheduled executor service.
*/
public static void stop() {
if (checkClusterStateScheduledFuture != null || closeConnectionScheduledFuture != null) {
synchronized (LoadBalanceHeartBeating.class) {
if (checkClusterStateScheduledFuture != null) {
checkClusterStateScheduledFuture.cancel(true);
leastConnStarted = false;
}
if (closeConnectionScheduledFuture != null) {
closeConnectionScheduledFuture.cancel(true);
quickAutoBalanceStarted = false;
}
}
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.quickautobalance;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import java.lang.reflect.Field;
/**
* Reflect util
*/
public class ReflectUtil {
private static Log LOGGER = Logger.getLogger(ReflectUtil.class.getName());
/**
* Get the private property of an object.
*
* @param classz class of object
* @param object object
* @param t class of the private property
* @param fieldName of the private property
* @param <T> class of the private property
* @return the private property
*/
public static <T> T getField(Class classz, Object object, Class<T> t, String fieldName) {
try {
Field field = classz.getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(object);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOGGER.error("get reflect field " + classz + "." + fieldName + " error.");
}
return null;
}
/**
* Set the private property of an object.
*
* @param classz class of object
* @param object object
* @param fieldName of the private property
* @param value value of the private property
*/
public static void setField(Class classz, Object object, String fieldName, Object value) {
try {
Field field = classz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(object, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOGGER.error("set reflect field " + classz + "." + fieldName + " error.");
}
}
}

View File

@ -109,6 +109,20 @@ public class TestUtil {
return System.getProperty("server", "localhost");
}
/*
* Returns the Secondary Test server
*/
public static String getSecondaryServer() {
return System.getProperty("secondaryServer", "localhost");
}
/*
* Returns the third Test server
*/
public static String getSecondaryServer2() {
return System.getProperty("secondaryServer2", "localhost");
}
/*
* Returns the Test port
*/
@ -116,6 +130,20 @@ public class TestUtil {
return Integer.parseInt(System.getProperty("port", System.getProperty("def_pgport")));
}
/*
* Returns the Secondary Test port
*/
public static int getSecondaryPort() {
return Integer.parseInt(System.getProperty("secondaryPort", System.getProperty("def_pgport")));
}
/*
* Returns the Third Test port
*/
public static int getSecondaryServerPort2() {
return Integer.parseInt(System.getProperty("secondaryServerPort2", System.getProperty("def_pgport")));
}
/*
* Returns the server side prepared statement threshold.
*/

View File

@ -0,0 +1,851 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.test.quickautobalance;
import org.junit.Test;
import org.postgresql.QueryCNListUtils;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.quickautobalance.Cluster;
import org.postgresql.quickautobalance.ConnectionInfo;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.DataNode;
import org.postgresql.quickautobalance.ReflectUtil;
import org.postgresql.test.TestUtil;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Cluster test
*/
public class ClusterTest {
private static Log LOGGER = Logger.getLogger(ClusterTest.class.getName());
private static final String FAKE_HOST = "127.127.217.217";
private static final String FAKE_PORT = "1";
private static final String FAKE_USER = "fakeuser";
private static final String FAKE_PASSWORD = "fakepassword";
private static final int DN_NUM = 3;
private PgConnection getConnection(String url, Properties properties) throws SQLException {
return DriverManager.getConnection(url, properties).unwrap(PgConnection.class);
}
private HostSpec[] initHostSpecs() {
HostSpec[] hostSpecs = new HostSpec[DN_NUM];
hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2());
return hostSpecs;
}
private HostSpec[] initHostSpecsWithInvalidNode() {
HostSpec[] hostSpecs = new HostSpec[DN_NUM];
hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
hostSpecs[2] = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT));
return hostSpecs;
}
private boolean checkHostSpecs(HostSpec[] hostSpecs) {
if (hostSpecs.length != DN_NUM) {
return false;
}
for (int i = 0; i < DN_NUM; i++) {
if ("localhost".equals(hostSpecs[i].getHost())) {
return false;
}
}
return true;
}
private Properties initPriority(HostSpec[] hostSpecs) {
String host = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String port = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", host);
properties.setProperty("PGPORT", port);
properties.setProperty("PGPORTURL", port);
properties.setProperty("PGHOST", host);
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
return properties;
}
private String initURL(HostSpec[] hostSpecs) {
String host1 = hostSpecs[0].getHost() + ":" + hostSpecs[0].getPort();
String host2 = hostSpecs[1].getHost() + ":" + hostSpecs[1].getPort();
String host3 = hostSpecs[2].getHost() + ":" + hostSpecs[2].getPort();
return "jdbc:postgresql://" + host1 + "," + host2 + "," + host3 + "/" + TestUtil.getDatabase();
}
@Test
public void checkConnectionsValidityTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
int total = 20;
int remove = 5;
List<PgConnection> pgConnectionList = new ArrayList<>();
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
pgConnectionList.add(pgConnection);
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
List<Integer> result = cluster.checkConnectionsValidity();
int sum = result.stream().reduce(Integer::sum).orElse(0);
assertEquals(0, sum);
for (int i = 0; i < remove; i++) {
try {
pgConnectionList.get(i).close();
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
sum = 0;
result = cluster.checkConnectionsValidity();
sum = result.stream().reduce(Integer::sum).orElse(0);
assertEquals(remove, sum);
ConnectionManager.getInstance().clear();
}
@Test
public void setMinReservedConPerClusterDefaultParamsTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
assertFalse(cluster.isEnableMinReservedConPerCluster());
assertFalse(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 0);
assertEquals(cluster.getMinReservedConPerDatanode(), 0);
}
@Test (expected = SQLException.class)
public void setMinReservedConPerClusterParsedFailedTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "asafaas");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState());
throw e;
}
}
@Test (expected = SQLException.class)
public void setMinReservedConPerClusterParsedTooSmallTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "-1");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test (expected = SQLException.class)
public void setMinReservedConPerClusterParsedTooBigTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "200");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test (expected = SQLException.class)
public void setMinReservedConPerDatanodeParsedFailedTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerDatanode", "asafaas");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState());
throw e;
}
}
@Test (expected = SQLException.class)
public void setMinReservedConPerDatanodeParsedTooSmallTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerDatanode", "-1");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test (expected = SQLException.class)
public void setMinReservedConPerDatanodeParsedTooBigTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url1 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerDatanode", "200");
try (PgConnection pgConnection = getConnection(url1, properties)) {
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test
public void updateParamsFailedTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
assertFalse(cluster.isEnableMinReservedConPerCluster());
assertFalse(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 0);
assertEquals(cluster.getMinReservedConPerDatanode(), 0);
String url2 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "40");
properties.setProperty("minReservedConPerDatanode", "50");
PgConnection pgConnection1 = getConnection(url2, properties);
cluster.setConnection(pgConnection1, properties);
assertTrue(cluster.isEnableMinReservedConPerCluster());
assertTrue(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 40);
assertEquals(cluster.getMinReservedConPerDatanode(), 50);
pgConnection1.close();
String url3 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "60");
properties.setProperty("minReservedConPerDatanode", "70");
PgConnection pgConnection2 = getConnection(url3, properties);
cluster.setConnection(pgConnection2, properties);
assertTrue(cluster.isEnableMinReservedConPerCluster());
assertTrue(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 40);
assertEquals(cluster.getMinReservedConPerDatanode(), 50);
pgConnection2.close();
ConnectionManager.getInstance().clear();
}
@Test
public void setConnectionStateSuccessTest() throws SQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
assertFalse(cluster.isEnableMinReservedConPerCluster());
assertFalse(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 0);
assertEquals(cluster.getMinReservedConPerDatanode(), 0);
String url2 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "40");
properties.setProperty("minReservedConPerDatanode", "50");
PgConnection pgConnection1 = getConnection(url2, properties);
cluster.setConnection(pgConnection1, properties);
assertTrue(cluster.isEnableMinReservedConPerCluster());
assertTrue(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 40);
assertEquals(cluster.getMinReservedConPerDatanode(), 50);
pgConnection1.close();
String url3 = initURL(hostSpecs) + "?autoBalance=leastconn";
properties.setProperty("minReservedConPerCluster", "20");
properties.setProperty("minReservedConPerDatanode", "30");
PgConnection pgConnection2 = getConnection(url3, properties);
cluster.setConnection(pgConnection2, properties);
assertTrue(cluster.isEnableMinReservedConPerCluster());
assertTrue(cluster.isEnableMinReservedConPerDatanode());
assertEquals(cluster.getMinReservedConPerCluster(), 20);
assertEquals(cluster.getMinReservedConPerDatanode(), 30);
pgConnection2.close();
ConnectionManager.getInstance().clear();
}
@Test
public void setConnectionTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
PgConnection pgConnection;
try {
pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
ConnectionInfo connectionInfo = cluster.getConnectionInfo(pgConnection);
assertEquals(pgConnection, connectionInfo.getPgConnection());
cluster.setConnection(null, properties);
cluster.setConnection(pgConnection, null);
ConnectionManager.getInstance().clear();
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
private void setConnection(Cluster cluster, String ip, int port, Properties properties) {
String url = "jdbc:postgresql://" + ip + ":" + port + "/" + TestUtil.getDatabase();
try {
final PgConnection pgConnection = getConnection(url, properties);
cluster.incrementCachedCreatingConnectionSize(new HostSpec(ip, port));
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void sortDnsByLeastConnTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
int num1 = 4;
int num2 = 6;
int num3 = 5;
for (int i = 0; i < num1; i++) {
setConnection(cluster, TestUtil.getServer(), TestUtil.getPort(), properties);
}
for (int i = 0; i < num2; i++) {
setConnection(cluster, TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort(), properties);
}
for (int i = 0; i < num3; i++) {
setConnection(cluster, TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2(), properties);
}
List<HostSpec> result = cluster.sortDnsByLeastConn(Arrays.asList(hostSpecs));
LOGGER.info(GT.tr("after sort: {0}", result.toString()));
assertEquals(TestUtil.getServer(), result.get(0).getHost());
assertEquals(TestUtil.getPort(), result.get(0).getPort());
assertEquals(TestUtil.getSecondaryServer(), result.get(2).getHost());
assertEquals(TestUtil.getSecondaryPort(), result.get(2).getPort());
assertEquals(TestUtil.getSecondaryServer2(), result.get(1).getHost());
assertEquals(TestUtil.getSecondaryServerPort2(), result.get(1).getPort());
ConnectionManager.getInstance().clear();
}
@Test
public void sortDnsByLeastConnWithOneNodeFailedTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
int num1 = 4;
int num2 = 6;
int num3 = 5;
for (int i = 0; i < num1; i++) {
setConnection(cluster, TestUtil.getServer(), TestUtil.getPort(), properties);
}
for (int i = 0; i < num2; i++) {
setConnection(cluster, TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort(), properties);
}
for (int i = 0; i < num3; i++) {
setConnection(cluster, TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2(), properties);
}
Map<HostSpec, DataNode> cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class, "cachedDnList");
DataNode dataNode = cachedDnList.getOrDefault(hostSpecs[0], null);
ReflectUtil.setField(DataNode.class, dataNode, "dataNodeState", false);
List<HostSpec> result = cluster.sortDnsByLeastConn(Arrays.asList(hostSpecs));
LOGGER.info(GT.tr("after sort: {0}", result.toString()));
assertEquals(TestUtil.getServer(), result.get(2).getHost());
assertEquals(TestUtil.getPort(), result.get(2).getPort());
assertEquals(TestUtil.getSecondaryServer(), result.get(1).getHost());
assertEquals(TestUtil.getSecondaryPort(), result.get(1).getPort());
assertEquals(TestUtil.getSecondaryServer2(), result.get(0).getHost());
assertEquals(TestUtil.getSecondaryServerPort2(), result.get(0).getPort());
ConnectionManager.getInstance().clear();
}
@Test
public void checkDnStateSpecSuccessTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
String URLIdentifier = String.valueOf(hostSpecs);
String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties clusterProperties = new Properties();
clusterProperties.setProperty("user", TestUtil.getUser());
clusterProperties.setProperty("password", TestUtil.getPassword());
clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
clusterProperties.setProperty("PGHOSTURL", pgHostUrl);
clusterProperties.setProperty("PGPORT", pgPortUrl);
clusterProperties.setProperty("PGPORTURL", pgPortUrl);
clusterProperties.setProperty("PGHOST", pgHostUrl);
Cluster cluster = new Cluster(URLIdentifier, clusterProperties);
for (int i = 0; i < DN_NUM; i++) {
assertTrue(cluster.checkDnState(hostSpecs[i]));
}
ConnectionManager.getInstance().clear();
}
@Test
public void checkDnStateUserOrPasswordErrorTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
String URLIdentifier = String.valueOf(hostSpecs);
String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties invalidProperties = new Properties();
invalidProperties.setProperty("user", FAKE_USER);
invalidProperties.setProperty("password", FAKE_PASSWORD);
invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
invalidProperties.setProperty("PGHOSTURL", pgHostUrl);
invalidProperties.setProperty("PGPORT", pgPortUrl);
invalidProperties.setProperty("PGPORTURL", pgPortUrl);
invalidProperties.setProperty("PGHOST", pgHostUrl);
Cluster cluster = new Cluster(URLIdentifier, invalidProperties);
for (int i = 0; i < DN_NUM; i++) {
assertFalse(cluster.checkDnState(hostSpecs[i]));
}
Properties validProperties = new Properties();
validProperties.setProperty("user", TestUtil.getUser());
validProperties.setProperty("password", TestUtil.getPassword());
validProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
validProperties.setProperty("PGHOSTURL", pgHostUrl);
validProperties.setProperty("PGPORT", pgPortUrl);
validProperties.setProperty("PGPORTURL", pgPortUrl);
validProperties.setProperty("PGHOST", pgHostUrl);
cluster.setProperties(invalidProperties);
cluster.setProperties(validProperties);
for (int i = 0; i < DN_NUM; i++) {
assertTrue(cluster.checkDnState(hostSpecs[i]));
}
ConnectionManager.getInstance().clear();
}
@Test
public void checkDnStateConnectFailedTest() throws PSQLException {
HostSpec[] hostSpecs = new HostSpec[DN_NUM];
for (int i = 0; i < DN_NUM; i++) {
hostSpecs[i] = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT));
}
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
String URLIdentifier = Arrays.toString(hostSpecs);
String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties invalidProperties = new Properties();
invalidProperties.setProperty("user", TestUtil.getUser());
invalidProperties.setProperty("password", TestUtil.getPassword());
invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
invalidProperties.setProperty("PGHOSTURL", pgHostUrl);
invalidProperties.setProperty("PGPORT", pgPortUrl);
invalidProperties.setProperty("PGPORTURL", pgPortUrl);
invalidProperties.setProperty("PGHOST", pgHostUrl);
Cluster cluster = new Cluster(URLIdentifier, invalidProperties);
for (int i = 0; i < DN_NUM; i++) {
assertFalse(cluster.checkDnState(hostSpecs[i]));
}
ConnectionManager.getInstance().clear();
}
@Test
public void checkConnectionStateSuccessTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
int total = 10;
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
int invalidDn = cluster.checkClusterState();
assertEquals(0, invalidDn);
ConnectionManager.getInstance().clear();
}
@Test
public void checkConnectionStateOneNodeFailedTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecsWithInvalidNode();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
int total = 10;
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
int invalidDn = cluster.checkClusterState();
assertEquals(1, invalidDn);
ConnectionManager.getInstance().clear();
}
@Test
public void checkConnectionStateAndQuickLoadBalanceTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
properties.setProperty("enableQuickAutoBalance", "true");
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
int total = 9;
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
// set dnState='false' of dn1
Map<HostSpec, DataNode> cachedDnList = ReflectUtil.getField(Cluster.class, cluster,
Map.class, "cachedDnList");
DataNode dataNode = cachedDnList.get(hostSpecs[0]);
dataNode.setDataNodeState(false);
int invalidDn = cluster.checkClusterState();
assertEquals(0, invalidDn);
// check cluster state: jdbc will find dnState change to true from false, and execute quickAutoBalance.
Queue<ConnectionInfo> abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster,
Queue.class, "abandonedConnectionList");
assertEquals(6, abandonedConnectionList.size());
ConnectionManager.getInstance().clear();
}
@Test
public void checkConnectionStateAndQuickLoadBalanceWithParamsTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
// init properties with quickLoadBalance.
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("minReservedConPerDatanode", "0");
properties.setProperty("minReservedConPerCluster", "75");
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs) + "?autoBalance=leastconn";
int total = 12;
// set connection
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
// set dnState='false' of dn1
Map<HostSpec, DataNode> cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class,
"cachedDnList");
DataNode dataNode = cachedDnList.get(hostSpecs[0]);
dataNode.setDataNodeState(false);
// check cluster state: jdbc will find dnState change to true from false, and execute quickAutoBalance.
int invalidDn = cluster.checkClusterState();
assertEquals(0, invalidDn);
Queue<ConnectionInfo> abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster,
Queue.class, "abandonedConnectionList");
assertEquals((int) (total / 3 * 2 * (100 - 75) / 100), abandonedConnectionList.size());
ConnectionManager.getInstance().clear();
}
@Test
public void incrementCachedCreatingConnectionSizeTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
// init properties with quickLoadBalance.
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
Map<HostSpec, DataNode> cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class,
"cachedDnList");
cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(1, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(2, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
cluster.incrementCachedCreatingConnectionSize(hostSpecs[1]);
assertEquals(1, cachedDnList.get(hostSpecs[1]).getCachedCreatingConnectionSize());
assertEquals(0, cluster.incrementCachedCreatingConnectionSize(new HostSpec(FAKE_HOST,
Integer.parseInt(FAKE_PORT))));
ConnectionManager.getInstance().clear();
}
@Test
public void decrementCachedCreatingConnectionSizeTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
// init properties with quickLoadBalance.
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
Map<HostSpec, DataNode> cachedDnList = ReflectUtil.getField(Cluster.class, cluster,
Map.class, "cachedDnList");
cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]);
cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(2, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(1, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(0, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]);
assertEquals(0, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize());
assertEquals(0, cluster.incrementCachedCreatingConnectionSize(new HostSpec(FAKE_HOST,
Integer.parseInt(FAKE_PORT))));
ConnectionManager.getInstance().clear();
}
@Test
public void setPropertiesTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties1 = initPriority(hostSpecs);
properties1.setProperty("autoBalance", "leastconn");
String URLIdentifier = QueryCNListUtils.keyFromURL(properties1);
Cluster cluster = new Cluster(URLIdentifier, properties1);
List<Properties> cachedPropertiesList = ReflectUtil.getField(Cluster.class, cluster, List.class,
"cachedPropertiesList");
assertEquals(1, cachedPropertiesList.size());
assertEquals(TestUtil.getUser(), cachedPropertiesList.get(0).getProperty("user", ""));
assertEquals(TestUtil.getPassword(), cachedPropertiesList.get(0).getProperty("password", ""));
Properties properties2 = initPriority(hostSpecs);
properties2.setProperty("user", "fakeuser");
properties2.setProperty("password", "fakepassword");
cluster.setProperties(properties2);
assertEquals(2, cachedPropertiesList.size());
assertEquals("fakeuser", cachedPropertiesList.get(1).getProperty("user", ""));
assertEquals("fakepassword", cachedPropertiesList.get(1).getProperty("password", ""));
Properties properties3 = initPriority(hostSpecs);
properties3.setProperty("password", "fakepassword2");
cluster.setProperties(properties3);
assertEquals(2, cachedPropertiesList.size());
assertEquals(TestUtil.getUser(), cachedPropertiesList.get(0).getProperty("user", ""));
assertEquals("fakepassword2", cachedPropertiesList.get(0).getProperty("password", ""));
ConnectionManager.getInstance().clear();
}
@Test
public void closeConnectionTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
Arrays.sort(hostSpecs);
String URLIdentifier = QueryCNListUtils.keyFromURL(properties);
Cluster cluster = new Cluster(URLIdentifier, properties);
String url = initURL(hostSpecs);
int total = 50;
List<PgConnection> pgConnectionList = new ArrayList<>();
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(url, properties);
cluster.setConnection(pgConnection, properties);
pgConnectionList.add(pgConnection);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
Queue<ConnectionInfo> abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster,
Queue.class, "abandonedConnectionList");
ReflectUtil.setField(Cluster.class, cluster, "totalAbandonedConnectionSize", total);
ReflectUtil.setField(Cluster.class, cluster, "quickAutoBalanceStartTime", Long.MAX_VALUE);
for (PgConnection pgConnection : pgConnectionList) {
ConnectionInfo connectionInfo = cluster.getConnectionInfo(pgConnection);
abandonedConnectionList.add(connectionInfo);
}
assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections());
assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections());
assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections());
assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections());
assertEquals(total - 4 * (int) (Math.ceil((double) total * 0.2)), cluster.closeConnections());
ConnectionManager.getInstance().clear();
}
}

View File

@ -0,0 +1,271 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.test.quickautobalance;
import org.junit.Test;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.quickautobalance.ConnectionInfo;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* ConnectionInfo Test
*/
public class ConnectionInfoTest {
private HostSpec initHost() {
return new HostSpec(TestUtil.getServer(), TestUtil.getPort());
}
private Properties initProperties() {
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
return properties;
}
@Test
public void createConnectionInfoWithDefaultParamsTest() throws SQLException {
// ConnectionInfo without quickAutoBalance
HostSpec hostSpec = initHost();
Properties properties = initProperties();
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertEquals(connectionInfo.getPgConnection(), pgConnection);
assertEquals(connectionInfo.getHostSpec(), hostSpec);
assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IDLE);
assertEquals(connectionInfo.getMaxIdleTimeBeforeTerminal(), 30);
assertEquals(connectionInfo.getAutoBalance(), "");
assertFalse(connectionInfo.isEnableQuickAutoBalance());
pgConnection.close();
}
@Test(expected = SQLException.class)
public void createConnectionInfoEnableQuickAutoBalanceParsedFailedTest() throws SQLException {
Properties properties = initProperties();
properties.setProperty("autoBalance", "luanma");
properties.setProperty("enableQuickAutoBalance", "luanma");
HostSpec hostSpec = initHost();
try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class)) {
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
} catch (PSQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test
public void createConnectionInfoEnableQuickAutoBalanceFalseTest() throws SQLException {
Properties properties = initProperties();
properties.setProperty("autoBalance", "luanma");
properties.setProperty("enableQuickAutoBalance", "false");
HostSpec hostSpec = initHost();
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertEquals("luanma", connectionInfo.getAutoBalance());
assertFalse(connectionInfo.isEnableQuickAutoBalance());
}
@Test
public void createConnectionInfoSuccessTest() throws SQLException {
Properties properties = initProperties();
HostSpec hostSpec = initHost();
properties.setProperty("maxIdleTimeBeforeTerminal", "66");
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("enableQuickAutoBalance", "true");
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertEquals(connectionInfo.getMaxIdleTimeBeforeTerminal(), 66);
assertTrue(connectionInfo.isEnableQuickAutoBalance());
assertEquals("leastconn", connectionInfo.getAutoBalance());
pgConnection.close();
}
@Test(expected = PSQLException.class)
public void createConnectionInfoMaxIdleTimeBeforeTerminalParsedFailedTest() throws SQLException {
Properties properties = initProperties();
properties.setProperty("maxIdleTimeBeforeTerminal", "111111@@");
HostSpec hostSpec = initHost();
try {
Connection connection = DriverManager.getConnection(TestUtil.getURL(), properties);
PgConnection pgConnection = connection.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState());
throw e;
}
}
@Test(expected = SQLException.class)
public void createConnectionInfoMaxIdleTimeBeforeTerminalTooBigTest() throws SQLException {
Properties properties = initProperties();
HostSpec hostSpec = initHost();
properties.setProperty("maxIdleTimeBeforeTerminal", String.valueOf(Long.MAX_VALUE));
properties.setProperty("enableQuickAutoBalance", "false");
try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class)) {
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test(expected = SQLException.class)
public void createConnectionInfoMaxIdleTimeBeforeTerminalTooSmallTest() throws SQLException {
Properties properties = initProperties();
HostSpec hostSpec = initHost();
properties.setProperty("maxIdleTimeBeforeTerminal", String.valueOf(-100));
properties.setProperty("enableQuickAutoBalance", "false");
try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class)) {
new ConnectionInfo(pgConnection, properties, hostSpec);
} catch (SQLException e) {
e.printStackTrace();
assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState());
throw e;
}
}
@Test
public void checkConnectionCanBeClosedTestPgConnectionNullTest() throws InterruptedException, PSQLException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
PgConnection pgConnection;
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
Thread.sleep(1100);
ConnectionInfo connectionInfo = new ConnectionInfo(null, properties, hostSpec);
assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis()));
}
@Test
public void checkConnectionCanBeClosedTestPgConnectionUnableQuickAutoBalance() throws SQLException,
InterruptedException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "false");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
Thread.sleep(1100);
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis()));
pgConnection.close();
}
@Test
public void checkConnectionCanBeClosedTestStartTimeSmallerThanCreateTimeStamp() throws SQLException,
InterruptedException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
Thread.sleep(1100);
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis() - 1000 * 10));
pgConnection.close();
}
@Test
public void checkConnectionCanBeClosedTestConnectionStateNoEqualsToIDLE() throws SQLException, InterruptedException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
connectionInfo.setConnectionState(StatementCancelState.IN_QUERY);
Thread.sleep(1100);
assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis()));
pgConnection.close();
}
@Test
public void checkConnectionCanBeCloseTestIDLETimeToShort() throws SQLException, InterruptedException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "10");
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
Thread.sleep(1100);
assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis()));
pgConnection.close();
}
@Test
public void checkConnectionCanBeCloseTestSuccessTest() throws SQLException, InterruptedException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
Thread.sleep(1100);
assertTrue(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis()));
pgConnection.close();
}
@Test
public void checkConnectionIsValidTest() throws SQLException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
assertTrue(connectionInfo.checkConnectionIsValid());
}
@Test
public void checkConnectionIsValidFailedTest() throws SQLException {
HostSpec hostSpec = initHost();
Properties properties = initProperties();
PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties)
.unwrap(PgConnection.class);
ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec);
pgConnection.getQueryExecutor().setAvailability(false);
pgConnection.close();
long before = System.currentTimeMillis();
assertFalse(connectionInfo.checkConnectionIsValid());
long after = System.currentTimeMillis();
}
}

View File

@ -0,0 +1,320 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.test.quickautobalance;
import org.junit.Test;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.quickautobalance.Cluster;
import org.postgresql.quickautobalance.ConnectionInfo;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.test.TestUtil;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
*
*/
public class ConnectionManagerTest {
private static Log LOGGER = Logger.getLogger(ConnectionManagerTest.class.getName());
private static final String USER = TestUtil.getUser();
private static final String PASSWORD = TestUtil.getPassword();
private static final String MASTER_1 = TestUtil.getServer() + ":" + TestUtil.getPort();
private static final String SECONDARY_1 = TestUtil.getSecondaryServer() + ":" + TestUtil.getSecondaryPort();
private static final String SECONDARY_2 = TestUtil.getSecondaryServer2() + ":" + TestUtil.getSecondaryServerPort2();
private static final String DATABASE = TestUtil.getDatabase();
private static final int DN_NUM = 3;
private String initURLWithLeastConn() {
return "jdbc:postgresql://" + MASTER_1 + "," + SECONDARY_1
+ "," + SECONDARY_2 + "/" + DATABASE + "?autoBalance=leastconn&loggerLevel=OFF";
}
private PgConnection getConnection(String url, String user, String password) throws SQLException {
return DriverManager.getConnection(url, user, password).unwrap(PgConnection.class);
}
private HostSpec[] initHostSpec() {
HostSpec[] hostSpecs = new HostSpec[DN_NUM];
hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2());
return hostSpecs;
}
private Properties initPriority(HostSpec[] hostSpecs) {
String host = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String port = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", host);
properties.setProperty("PGPORT", port);
properties.setProperty("PGPORTURL", port);
properties.setProperty("PGHOST", host);
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
return properties;
}
@Test
public void setConnectionTest() throws ClassNotFoundException, SQLException {
if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport"))
|| String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) {
return;
}
Class.forName("org.postgresql.Driver");
String url = initURLWithLeastConn();
PgConnection connection1 = getConnection(url, USER, PASSWORD);
String urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
Cluster cluster = ConnectionManager.getInstance().getCluster(urlIdentifier);
assertNotEquals(cluster, null);
ConnectionInfo connectionInfo = cluster.getConnectionInfo(connection1);
assertNotEquals(connectionInfo, null);
assertEquals(connectionInfo.getPgConnection(), connection1);
PgConnection connection2 = getConnection(url, USER, PASSWORD);
urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
cluster = ConnectionManager.getInstance().getCluster(urlIdentifier);
assertNotEquals(cluster, null);
connectionInfo = cluster.getConnectionInfo(connection2);
assertNotEquals(connectionInfo, null);
assertEquals(connectionInfo.getPgConnection(), connection2);
ConnectionManager.getInstance().clear();
}
@Test
public void setConnectionStateTest() throws ClassNotFoundException, SQLException {
if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport"))
|| String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) {
return;
}
Class.forName("org.postgresql.Driver");
String url = initURLWithLeastConn();
// set connection state which exists.
PgConnection connection = getConnection(url, USER, PASSWORD);
ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.IDLE);
String urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
ConnectionInfo connectionInfo = ConnectionManager.getInstance()
.getCluster(urlIdentifier).getConnectionInfo(connection);
assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IDLE);
ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.IN_QUERY);
urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection);
assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IN_QUERY);
ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.CANCELING);
urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection);
assertEquals(connectionInfo.getConnectionState(), StatementCancelState.CANCELING);
ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.CANCELLED);
urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url);
connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection);
assertEquals(connectionInfo.getConnectionState(), StatementCancelState.CANCELLED);
ConnectionManager.getInstance().clear();
}
@Test
public void leastConnTest() throws SQLException, ClassNotFoundException {
if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport"))
|| String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) {
return;
}
Class.forName("org.postgresql.Driver");
String url = initURLWithLeastConn();
int num = 100;
HashMap<String, Integer> dns = new HashMap<>();
for (int i = 0; i < num; i++) {
PgConnection connection = getConnection(url, USER, PASSWORD);
String socketAddress = connection.getSocketAddress().split("/")[1];
dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1);
}
List<Integer> connectionCounts = new ArrayList<>();
for (Entry<String, Integer> entry : dns.entrySet()) {
connectionCounts.add(entry.getValue());
}
for (Integer connectionCount : connectionCounts) {
LOGGER.info(GT.tr("{0}", connectionCount));
}
for (int i = 0; i < connectionCounts.size() - 1; i++) {
assertTrue(Math.abs(connectionCounts.get(i) - connectionCounts.get(i + 1)) <= 1);
}
ConnectionManager.getInstance().clear();
}
@Test
public void leastConnIntoOneDnTest() throws SQLException, ClassNotFoundException {
if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport"))
|| String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) {
return;
}
Class.forName("org.postgresql.Driver");
String url1 = initURLWithLeastConn();
url1 += "&targetServerType=slave";
int num1 = 100;
HashMap<String, Integer> dns = new HashMap<>();
for (int i = 0; i < num1; i++) {
PgConnection connection = getConnection(url1, USER, PASSWORD);
String socketAddress = connection.getSocketAddress().split("/")[1];
dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1);
}
LOGGER.info(GT.tr("create 100 connections to slave nodes."));
for (Entry<String, Integer> entry : dns.entrySet()) {
int num = entry.getValue();
LOGGER.info(GT.tr("{0} : {1}", entry.getKey(), entry.getValue()));
assertEquals(50, num);
}
int num2 = 20;
String url2 = initURLWithLeastConn();
String lastAddress = "";
for (int i = 0; i < num2; i++) {
PgConnection connection = getConnection(url2, TestUtil.getUser(), TestUtil.getPassword());
String socketAddress = connection.getSocketAddress().split("/")[1];
dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1);
if (i > 1) {
assertEquals(lastAddress, socketAddress);
}
lastAddress = socketAddress;
}
LOGGER.info(GT.tr("Create 20 connections to all nodes."));
for (Entry<String, Integer> entry : dns.entrySet()) {
LOGGER.info(GT.tr("{0} : {1}", entry.getKey(), entry.getValue()));
}
ConnectionManager.getInstance().clear();
}
@Test
public void checkConnectionStateTest() {
// If this testcase fail, maybe heartBeatingThread remove invalid connections.
try {
if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport"))
|| String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) {
return;
}
Class.forName("org.postgresql.Driver");
String url = initURLWithLeastConn();
List<PgConnection> pgConnections = new ArrayList<>();
int total = 100;
int remove = 10;
for (int i = 0; i < total; i++) {
PgConnection connection = getConnection(url, USER, PASSWORD);
pgConnections.add(connection);
}
for (int i = 0; i < remove; i++) {
pgConnections.get(i).close();
}
assertEquals(Integer.valueOf(remove), ConnectionManager.getInstance().checkConnectionsValidity().get(0));
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
fail();
}
ConnectionManager.getInstance().clear();
}
@Test
public void setClusterTest() throws PSQLException {
HostSpec[] hostSpecs = initHostSpec();
Properties properties = initPriority(hostSpecs);
assertFalse(ConnectionManager.getInstance().setCluster(properties));
assertFalse(ConnectionManager.getInstance().setCluster(null));
properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
assertFalse(ConnectionManager.getInstance().setCluster(properties));
properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
assertTrue(ConnectionManager.getInstance().setCluster(properties));
assertFalse(ConnectionManager.getInstance().setCluster(properties));
ConnectionManager.getInstance().clear();
}
@Test
public void incrementCachedCreatingConnectionSize() throws PSQLException {
HostSpec[] hostSpecs = initHostSpec();
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
ConnectionManager.getInstance().setCluster(properties);
assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(1, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], null));
assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
properties.setProperty("autoBalance", "");
assertEquals(0, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
properties.setProperty("autoBalance", "leastconn");
assertEquals(0, ConnectionManager.getInstance()
.incrementCachedCreatingConnectionSize(new HostSpec("127.127.127.127", 93589), properties));
ConnectionManager.getInstance().clear();
}
@Test
public void decrementCachedCreatingConnectionSize() throws PSQLException {
HostSpec[] hostSpecs = initHostSpec();
Properties properties = initPriority(hostSpecs);
properties.setProperty("autoBalance", "leastconn");
ConnectionManager.getInstance().setCluster(properties);
assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(1, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], null));
assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties));
properties.setProperty("autoBalance", "");
assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties));
properties.setProperty("autoBalance", "leastconn");
assertEquals(0, ConnectionManager.getInstance()
.decrementCachedCreatingConnectionSize(new HostSpec("127.127.127.127", 93589), properties));
ConnectionManager.getInstance().clear();
}
}

View File

@ -0,0 +1,489 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.test.quickautobalance;
import org.junit.Test;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.StatementCancelState;
import org.postgresql.quickautobalance.ConnectionInfo;
import org.postgresql.quickautobalance.DataNode;
import org.postgresql.quickautobalance.DataNode.CheckDnStateResult;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
*
*/
public class DataNodeTest {
private static final int DN_NUM = 3;
private static final String FAKE_HOST = "127.127.217.217";
private static final String FAKE_PORT = "1";
private static final String FAKE_USER = "fakeuser";
private static final String FAKE_DATABASE = "fakedatabase";
private static final String FAKE_PASSWORD = "fakepassword";
private HostSpec initHost() {
return new HostSpec(TestUtil.getServer(), TestUtil.getPort());
}
private PgConnection getConnection(String url, Properties properties) throws SQLException {
Connection connection = DriverManager.getConnection(url, properties);
assertTrue(connection instanceof PgConnection);
return (PgConnection) connection;
}
private Properties initProperties() {
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
properties.setProperty("autoBalance", "leastconn");
return properties;
}
private HostSpec[] initHostSpecs() {
HostSpec[] hostSpecs = new HostSpec[DN_NUM];
hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2());
return hostSpecs;
}
private boolean checkHostSpecs(HostSpec[] hostSpecs) {
if (hostSpecs.length != DN_NUM) {
return false;
}
for (int i = 0; i < DN_NUM; i++) {
if ("localhost".equals(hostSpecs[i].getHost())) {
return false;
}
}
return true;
}
@Test
public void setConnectionStateTest() throws SQLException {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
Properties properties = initProperties();
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
dataNode.setConnectionState(pgConnection, StatementCancelState.IDLE);
assertEquals(StatementCancelState.IDLE, dataNode.getConnectionInfo(pgConnection).getConnectionState());
dataNode.setConnectionState(pgConnection, StatementCancelState.IN_QUERY);
assertEquals(StatementCancelState.IN_QUERY, dataNode.getConnectionInfo(pgConnection).getConnectionState());
dataNode.setConnectionState(pgConnection, StatementCancelState.CANCELLED);
assertEquals(StatementCancelState.CANCELLED, dataNode.getConnectionInfo(pgConnection).getConnectionState());
dataNode.setConnectionState(pgConnection, StatementCancelState.CANCELING);
assertEquals(StatementCancelState.CANCELING, dataNode.getConnectionInfo(pgConnection).getConnectionState());
}
@Test
public void setConnectionTest() throws SQLException {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
Properties properties = initProperties();
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection);
assertNotNull(connectionInfo);
assertEquals(pgConnection, connectionInfo.getPgConnection());
dataNode.setConnection(null, properties, hostSpec);
dataNode.setConnection(pgConnection, null, hostSpec);
dataNode.setConnection(pgConnection, properties, null);
}
@Test
public void getConnectionTest() throws SQLException {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
Properties properties = initProperties();
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection);
assertNotNull(connectionInfo);
assertEquals(pgConnection, connectionInfo.getPgConnection());
connectionInfo = dataNode.getConnectionInfo(null);
assertNull(connectionInfo);
}
@Test
public void getCachedConnectionListSizeTest() throws SQLException {
int num = 10;
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
Properties properties = initProperties();
assertEquals(0, dataNode.getCachedConnectionListSize());
for (int i = 0; i < num; i++) {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
}
int result = dataNode.getCachedConnectionListSize();
assertEquals(num, result);
}
@Test
public void checkDnStateWithPropertiesSuccessTest() {
HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
DataNode dataNode = new DataNode(hostSpec);
try {
assertTrue(dataNode.checkDnState(properties));
} catch (InvocationTargetException | PSQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void checkDnStateWithPropertiesUsernameOrPasswordErrorTest() {
HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", FAKE_USER);
properties.setProperty("password", FAKE_PASSWORD);
DataNode dataNode = new DataNode(hostSpec);
try {
dataNode.checkDnState(properties);
} catch (InvocationTargetException e) {
assertTrue(e.getCause() instanceof PSQLException);
if (e.getCause() instanceof PSQLException) {
PSQLException psqlException = (PSQLException) (e.getCause());
String sqlState = psqlException.getSQLState();
assertEquals("28P01", sqlState);
} else {
e.printStackTrace();
fail();
}
} catch (PSQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void checkDnStateWithPropertiesDatabaseErrorTest() {
// Invalid parameter "PGDBNAME" doesn't affect to tryConnect().
HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort());
Properties properties = new Properties();
properties.setProperty("PGDBNAME", FAKE_DATABASE);
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOST", TestUtil.getServer());
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
DataNode dataNode = new DataNode(hostSpec);
try {
assertTrue(dataNode.checkDnState(properties));
} catch (InvocationTargetException | PSQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void checkDnStateWithPropertiesConnectionFailedTest() {
HostSpec hostSpec = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT));
Properties properties = new Properties();
properties.setProperty("PGDBNAME", TestUtil.getDatabase());
properties.setProperty("PGHOSTURL", FAKE_HOST);
properties.setProperty("PGPORT", FAKE_PORT);
properties.setProperty("PGPORTURL", FAKE_PORT);
properties.setProperty("PGHOST", FAKE_HOST);
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
DataNode dataNode = new DataNode(hostSpec);
try {
dataNode.checkDnState(properties);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof PSQLException) {
PSQLException psqlException = (PSQLException) (e.getCause());
String SQLState = psqlException.getSQLState();
assertNotEquals("28P01", SQLState);
} else {
assertTrue(true);
}
} catch (PSQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void checkDnStateWithHostSpecSuccessTest() {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties clusterProperties = new Properties();
clusterProperties.setProperty("user", TestUtil.getUser());
clusterProperties.setProperty("password", TestUtil.getPassword());
clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
clusterProperties.setProperty("PGHOSTURL", pgHostUrl);
clusterProperties.setProperty("PGPORT", pgPortUrl);
clusterProperties.setProperty("PGPORTURL", pgPortUrl);
clusterProperties.setProperty("PGHOST", pgHostUrl);
DataNode dataNode = new DataNode(new HostSpec(TestUtil.getServer(), TestUtil.getPort()));
CheckDnStateResult result = dataNode.checkDnStateAndProperties(clusterProperties);
assertEquals(CheckDnStateResult.DN_VALID, result);
}
@Test
public void checkDnStateWithHostSpecUserOrPasswordExpiredTest() {
HostSpec[] hostSpecs = initHostSpecs();
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost();
String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort();
Properties invalidProperties = new Properties();
invalidProperties.setProperty("user", FAKE_USER);
invalidProperties.setProperty("password", FAKE_PASSWORD);
invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
invalidProperties.setProperty("PGHOSTURL", pgHostUrl);
invalidProperties.setProperty("PGPORT", pgPortUrl);
invalidProperties.setProperty("PGPORTURL", pgPortUrl);
invalidProperties.setProperty("PGHOST", pgHostUrl);
DataNode dataNode = new DataNode(new HostSpec(TestUtil.getServer(), TestUtil.getPort()));
CheckDnStateResult result = dataNode.checkDnStateAndProperties(invalidProperties);
assertEquals(CheckDnStateResult.PROPERTIES_INVALID, result);
}
@Test
public void checkDnStateWithHostSpecConnectFailedTest() {
HostSpec[] hostSpecs = initHostSpecs();
HostSpec hostSpec = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT));
if (!checkHostSpecs(hostSpecs)) {
return;
}
Arrays.sort(hostSpecs);
Properties clusterProperties = new Properties();
clusterProperties.setProperty("user", TestUtil.getUser());
clusterProperties.setProperty("password", TestUtil.getPassword());
clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase());
clusterProperties.setProperty("PGHOSTURL", FAKE_HOST);
clusterProperties.setProperty("PGPORT", FAKE_PORT);
clusterProperties.setProperty("PGPORTURL", FAKE_PORT);
clusterProperties.setProperty("PGHOST", FAKE_HOST);
DataNode dataNode = new DataNode(hostSpec);
CheckDnStateResult result = dataNode.checkDnStateAndProperties(clusterProperties);
assertEquals(CheckDnStateResult.DN_INVALID, result);
}
@Test
public void checkConnectionValidityTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
int total = 10;
int remove = 2;
List<PgConnection> pgConnections = new ArrayList<>();
Properties properties = initProperties();
try {
for (int i = 0; i < total; i++) {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
pgConnections.add(pgConnection);
}
assertEquals(0, dataNode.checkConnectionsValidity());
assertEquals(10, dataNode.getCachedConnectionListSize());
for (int i = 0; i < remove; i++) {
pgConnections.get(i).close();
}
assertEquals(2, dataNode.checkConnectionsValidity());
assertEquals(10 - 2, dataNode.getCachedConnectionListSize());
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
@Test
public void filterIdleConnectionsTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
int idleSize = 10;
int querySize = 20;
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
for (int i = 0; i < idleSize; i++) {
try {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
for (int i = 0; i < querySize; i++) {
try {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection);
connectionInfo.setConnectionState(StatementCancelState.IN_QUERY);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
List<ConnectionInfo> connectionInfos = dataNode.filterIdleConnections(System.currentTimeMillis());
assertEquals(idleSize, connectionInfos.size());
for (ConnectionInfo connectionInfo : connectionInfos) {
assertEquals(StatementCancelState.IDLE, connectionInfo.getConnectionState());
}
}
@Test
public void getAndSetDNStateTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
assertTrue(dataNode.getDataNodeState());
dataNode.setDataNodeState(false);
assertFalse(dataNode.getDataNodeState());
}
@Test
public void clearCachedConnectionsTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
int idleSize = 10;
Properties properties = initProperties();
properties.setProperty("enableQuickAutoBalance", "true");
properties.setProperty("maxIdleTimeBeforeTerminal", "1");
for (int i = 0; i < idleSize; i++) {
try {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
assertEquals(idleSize, dataNode.getCachedConnectionListSize());
dataNode.clearCachedConnections();
assertEquals(0, dataNode.getCachedConnectionListSize());
}
@Test
public void closeConnectionsTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
int total = 10;
int closed = 5;
Properties properties = initProperties();
List<PgConnection> connectionList = new ArrayList<>();
for (int i = 0; i < total; i++) {
try {
PgConnection pgConnection = getConnection(TestUtil.getURL(), properties);
dataNode.setConnection(pgConnection, properties, hostSpec);
connectionList.add(pgConnection);
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
assertEquals(total, dataNode.getCachedConnectionListSize());
for (int i = 0; i < total; i++) {
try {
assertTrue(connectionList.get(i).isValid(4));
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
for (int i = 0; i < closed; i++) {
assertTrue(dataNode.closeConnection(connectionList.get(i)));
}
assertEquals(total - closed, dataNode.getCachedConnectionListSize());
for (int i = 0; i < closed; i++) {
try {
assertFalse(connectionList.get(i).isValid(4));
} catch (SQLException e) {
e.printStackTrace();
fail();
}
}
}
@Test
public void getAndSetCachedCreatingConnectionSizeTest() {
HostSpec hostSpec = initHost();
DataNode dataNode = new DataNode(hostSpec);
assertEquals(0, dataNode.getCachedCreatingConnectionSize());
assertEquals(1, dataNode.incrementCachedCreatingConnectionSize());
assertEquals(1, dataNode.getCachedCreatingConnectionSize());
assertEquals(0, dataNode.decrementCachedCreatingConnectionSize());
assertEquals(0, dataNode.getCachedCreatingConnectionSize());
assertEquals(0, dataNode.decrementCachedCreatingConnectionSize());
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package org.postgresql.test.quickautobalance;
import org.junit.Test;
import org.postgresql.quickautobalance.LoadBalanceHeartBeating;
import org.postgresql.test.TestUtil;
import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* LoadBalanceHeartBeatingTest
*/
public class LoadBalanceHeartBeatingTest {
private Properties initProperties() {
Properties properties = new Properties();
properties.setProperty("PGPORTURL", TestUtil.getPort() + ","
+ TestUtil.getSecondaryPort() + "," + TestUtil.getSecondaryServerPort2());
properties.setProperty("PGHOSTURL", TestUtil.getServer() + ","
+ TestUtil.getSecondaryServer() + "," + TestUtil.getSecondaryServer2());
return properties;
}
@Test
public void startCheckConnectionScheduledExecutorServiceSuccessTest() {
Properties properties = initProperties();
properties.setProperty("autoBalance", "leastconn");
assertFalse(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.stop();
}
@Test
public void startCloseConnectionExecutorServiceSuccessTest() {
Properties properties = initProperties();
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("enableQuickAutoBalance", "true");
assertFalse(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertTrue(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertTrue(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.stop();
}
@Test
public void startCloseConnectionExecutorServiceFailedTest() {
Properties properties = initProperties();
properties.setProperty("autoBalance", "leastconn");
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
properties.setProperty("enableQuickAutoBalance", "fsfsfs");
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
properties.setProperty("enableQuickAutoBalance", "false");
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertTrue(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.stop();
}
@Test
public void startExecutorServiceWithSingleHostTest() {
Properties properties = new Properties();
properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort()));
properties.setProperty("PGHOSTURL", TestUtil.getServer());
properties.setProperty("autoBalance", "leastconn");
properties.setProperty("enableQuickAutoBalance", "true");
assertFalse(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertFalse(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
LoadBalanceHeartBeating.startScheduledExecutorService(properties);
assertFalse(LoadBalanceHeartBeating.isLeastConnStarted());
assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted());
}
}