diff --git a/pgjdbc/src/main/java/org/postgresql/Driver.java b/pgjdbc/src/main/java/org/postgresql/Driver.java index 63a8fa7..c57d9c6 100755 --- a/pgjdbc/src/main/java/org/postgresql/Driver.java +++ b/pgjdbc/src/main/java/org/postgresql/Driver.java @@ -11,6 +11,8 @@ import org.postgresql.jdbc.PgConnection; import org.postgresql.log.Logger; import org.postgresql.log.Log; import org.postgresql.log.Tracer; +import org.postgresql.quickautobalance.ConnectionManager; +import org.postgresql.quickautobalance.LoadBalanceHeartBeating; import org.postgresql.util.DriverInfo; import org.postgresql.util.GT; import org.postgresql.util.HostSpec; @@ -558,8 +560,12 @@ public class Driver implements java.sql.Driver { * @throws SQLException if the connection could not be made */ private static Connection makeConnection(String url, Properties props) throws SQLException { + ConnectionManager.getInstance().setCluster(props); PgConnection pgConnection = new PgConnection(hostSpecs(props), user(props), database(props), props, url); GlobalConnectionTracker.possessConnectionReference(pgConnection.getQueryExecutor(), props); + if (ConnectionManager.getInstance().setConnection(pgConnection, props)) { + LoadBalanceHeartBeating.startScheduledExecutorService(props); + } return pgConnection; } diff --git a/pgjdbc/src/main/java/org/postgresql/PGProperty.java b/pgjdbc/src/main/java/org/postgresql/PGProperty.java index c219048..dced34c 100644 --- a/pgjdbc/src/main/java/org/postgresql/PGProperty.java +++ b/pgjdbc/src/main/java/org/postgresql/PGProperty.java @@ -472,7 +472,43 @@ public enum PGProperty { + "which will allow the connection to be used for logical replication " + "from that database. " + "(backend >= 9.4)"), - + + /** + * Enable quick auto balancing. + * + */ + ENABLE_QUICK_AUTO_BALANCE("enableQuickAutoBalance", "false", + "If the connection enable quickAutoBalance, this parameter only takes effect when autoBalance=leastconn." + + "value: true or false.", + false, "true", "false"), + + /** + * Idle time threshold of connections when quick auto balancing filters connections. + */ + MAX_IDLE_TIME_BEFORE_TERMINAL("maxIdleTimeBeforeTerminal", "30", + "During quick load balancing, if the connection is idle for more than maxIdleTimeBeforeTerminal seconds, " + + "the connection may be closed to achieve load balancing for each data node." + + "Value range: long && [0, Long.MAX_VALUE / 1000]." + + "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"), + + /** + * Percentage of min reserved connections pre cluster when executing quick auto balancing. + */ + MIN_RESERVED_CON_PER_CLUSTER("minReservedConPerCluster", null, + "Percentage of min reserved connections pre cluster when executing quick auto balancing, " + + "jdbc will retain minReservedConPerCluster percent of the connections per cluster that meet the closing conditions during quick auto balancing." + + "Value range: int && [0, 100]." + + "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"), + + /** + * Percentage of min reserved connections pre data node when executing quick auto balancing. + */ + MIN_RESERVED_CON_PER_DATANODE("minReservedConPerDatanode", null, + "Percentage of min reserved connections pre data node when executing quick auto balancing, " + + "jdbc will retain minReservedConPerCluster percent of the connections pre data node that meet the closing conditions during quick auto balancing." + + "Value range: int && [0, 100]." + + "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"), + /** * Supported TLS cipher suites */ diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java index d9983d2..50aa32c 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java @@ -20,6 +20,7 @@ import org.postgresql.core.Version; import org.postgresql.hostchooser.*; import org.postgresql.jdbc.SslMode; import org.postgresql.QueryCNListUtils; +import org.postgresql.quickautobalance.ConnectionManager; import org.postgresql.util.*; import org.postgresql.log.Logger; import org.postgresql.log.Log; @@ -214,6 +215,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { HostChooserFactory.createHostChooser(currentHostSpecs, targetServerType, info); Iterator hostIter = hostChooser.iterator(); boolean isMasterCluster = false; + boolean isFirstIter = true; while (hostIter.hasNext()) { CandidateHost candidateHost = hostIter.next(); HostSpec hostSpec = candidateHost.hostSpec; @@ -223,13 +225,19 @@ public class ConnectionFactoryImpl extends ConnectionFactory { // In that case, the system tries to connect to each host in order, thus it should not look into // GlobalHostStatusTracker HostStatus knownStatus = knownStates.get(hostSpec); + if (isFirstIter) { + isFirstIter = false; + } else { + ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpec, info); + } if (knownStatus != null && !candidateHost.targetServerType.allowConnectingTo(knownStatus)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Known status of host " + hostSpec + " is " + knownStatus + ", and required status was " + candidateHost.targetServerType + ". Will try next host"); } + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); continue; } - + // // Establish a connection. // @@ -308,6 +316,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { GlobalClusterStatusTracker.reportMasterCluster(info, clusterSpec); } else { queryExecutor.close(); + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); break; } } @@ -322,6 +331,7 @@ public class ConnectionFactoryImpl extends ConnectionFactory { knownStates.put(hostSpec, hostStatus); if (!candidateHost.targetServerType.allowConnectingTo(hostStatus)) { queryExecutor.close(); + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); continue; } @@ -349,12 +359,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory { if (hostIter.hasNext() || clusterIter.hasNext()) { LOGGER.info("ConnectException occured while connecting to {0}" + hostSpec, cex); exception.addSuppressed(cex); + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); // still more addresses to try continue; } if (exception.getSuppressed().length > 0) { cex.addSuppressed(exception); } + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); throw new PSQLException(GT.tr( "Connection to {0} refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.", hostSpec), PSQLState.CONNECTION_UNABLE_TO_CONNECT, cex); @@ -365,12 +377,14 @@ public class ConnectionFactoryImpl extends ConnectionFactory { if (hostIter.hasNext() || clusterIter.hasNext()) { LOGGER.info("IOException occured while connecting to " + hostSpec, ioe); exception.addSuppressed(ioe); + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); // still more addresses to try continue; } if (exception.getSuppressed().length > 0) { ioe.addSuppressed(exception); } + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); throw new PSQLException(GT.tr("The connection attempt failed."), PSQLState.CONNECTION_UNABLE_TO_CONNECT, ioe); } catch (SQLException se) { @@ -381,11 +395,13 @@ public class ConnectionFactoryImpl extends ConnectionFactory { LOGGER.info("SQLException occured while connecting to " + hostSpec, se); exception.addSuppressed(se); // still more addresses to try + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); continue; } if (exception.getSuppressed().length > 0) { se.addSuppressed(exception); } + ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpec, info); throw se; } } diff --git a/pgjdbc/src/main/java/org/postgresql/hostchooser/MultiHostChooser.java b/pgjdbc/src/main/java/org/postgresql/hostchooser/MultiHostChooser.java index 25c519d..9ec2f97 100644 --- a/pgjdbc/src/main/java/org/postgresql/hostchooser/MultiHostChooser.java +++ b/pgjdbc/src/main/java/org/postgresql/hostchooser/MultiHostChooser.java @@ -12,6 +12,8 @@ import org.postgresql.PGProperty; import org.postgresql.log.Log; import org.postgresql.log.Logger; import org.postgresql.QueryCNListUtils; +import org.postgresql.quickautobalance.ConnectionManager; +import org.postgresql.quickautobalance.Cluster; import org.postgresql.util.HostSpec; import org.postgresql.util.PSQLException; @@ -95,6 +97,7 @@ public class MultiHostChooser implements HostChooser { allHosts = priorityRoundRobin(allHosts); break; case LeastConn: + allHosts = leastConn(allHosts); break; default: isOutPutLog = false; @@ -108,6 +111,7 @@ public class MultiHostChooser implements HostChooser { } return allHosts; } + // Returns a counter and increments it by one. // Because it is possible to use it in multiple instances, use synchronized (MultiHostChooser.class). private int getRRIndex() { @@ -137,6 +141,17 @@ public class MultiHostChooser implements HostChooser { Collections.shuffle(result.subList(1, result.size())); return result; } + + private List leastConn(List hostSpecs) { + if (hostSpecs.size() <= 1) { + return hostSpecs; + } + Cluster cluster = ConnectionManager.getInstance().getCluster(URLIdentifier); + if (cluster == null) { + return hostSpecs; + } + return cluster.sortDnsByLeastConn(hostSpecs); + } /* * Use for RR algorithm. In case of first CN is not been connected, jdbc will diff --git a/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java b/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java index 7abe182..5d446ee 100644 --- a/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java +++ b/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java @@ -7,6 +7,8 @@ package org.postgresql.jdbc; import org.postgresql.Driver; import org.postgresql.core.*; +import org.postgresql.quickautobalance.ConnectionManager; +import org.postgresql.quickautobalance.LoadBalanceHeartBeating; import org.postgresql.util.GT; import org.postgresql.util.PGbytea; import org.postgresql.util.PSQLException; @@ -22,7 +24,6 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.TimerTask; -import org.postgresql.core.v3.ConnectionFactoryImpl; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -301,7 +302,7 @@ public class PgStatement implements Statement, BaseStatement { public boolean executeWithFlags(String sql, int flags) throws SQLException { return executeCachedSql(sql, flags, NO_RETURNING_COLUMNS); } - + private boolean executeCachedSql(String sql, int flags, String[] columnNames) throws SQLException { PreferQueryMode preferQueryMode = connection.getPreferQueryMode(); // Simple statements should not replace ?, ? with $1, $2 @@ -1117,12 +1118,14 @@ public class PgStatement implements Statement, BaseStatement { // Not in query, there's nothing to cancel return; } + setConnectionState(StatementCancelState.CANCELING); // Synchronize on connection to avoid spinning in killTimerTask synchronized (connection) { try { connection.cancelQuery(); } finally { STATE_UPDATER.set(this, StatementCancelState.CANCELLED); + setConnectionState(StatementCancelState.CANCELLED); connection.notifyAll(); // wake-up killTimerTask } } @@ -1166,14 +1169,14 @@ public class PgStatement implements Statement, BaseStatement { fetchSize = rows; } - private void startTimer() { + private void startTimer() throws SQLException { /* * there shouldn't be any previous timer active, but better safe than sorry. */ cleanupTimer(); STATE_UPDATER.set(this, StatementCancelState.IN_QUERY); - + setConnectionState(StatementCancelState.IN_QUERY); if (timeout == 0) { return; } @@ -1218,13 +1221,20 @@ public class PgStatement implements Statement, BaseStatement { return true; } - private void killTimerTask() { + private void setConnectionState(StatementCancelState state) throws SQLException { + if (LoadBalanceHeartBeating.isLoadBalanceHeartBeatingStarted() && this.connection instanceof PgConnection) { + ConnectionManager.getInstance().setConnectionState(this.connection.unwrap(PgConnection.class), state); + } + } + + private void killTimerTask() throws SQLException { boolean timerTaskIsClear = cleanupTimer(); // The order is important here: in case we need to wait for the cancel task, the state must be // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query. // It is believed that this case is very rare, so "additional cancel and wait below" would not // harm it. if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) { + setConnectionState(StatementCancelState.IDLE); return; } @@ -1245,6 +1255,7 @@ public class PgStatement implements Statement, BaseStatement { interrupted = true; } } + setConnectionState(StatementCancelState.IDLE); } if (interrupted) { Thread.currentThread().interrupt(); diff --git a/pgjdbc/src/main/java/org/postgresql/jdbc/StatementCancelState.java b/pgjdbc/src/main/java/org/postgresql/jdbc/StatementCancelState.java index 9cf623d..c5400c7 100644 --- a/pgjdbc/src/main/java/org/postgresql/jdbc/StatementCancelState.java +++ b/pgjdbc/src/main/java/org/postgresql/jdbc/StatementCancelState.java @@ -8,7 +8,7 @@ package org.postgresql.jdbc; /** * Represents {@link PgStatement#cancel()} state. */ -enum StatementCancelState { +public enum StatementCancelState { IDLE, IN_QUERY, CANCELING, diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/Cluster.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/Cluster.java new file mode 100644 index 0000000..44592fc --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/Cluster.java @@ -0,0 +1,626 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.Driver; +import org.postgresql.PGProperty; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.quickautobalance.DataNode.CheckDnStateResult; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.Queue; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A Cluster, which cached the information of the dataNodes in this cluster. + */ +public class Cluster { + private static Log LOGGER = Logger.getLogger(Cluster.class.getName()); + + private static final double CLOSE_CONNECTION_PERCENTAGE_EACH_TIME = 0.2d; + + private final String urlIdentifier; + + private final Set dns; + + private final Queue abandonedConnectionList; + + private final Map cachedDnList; + + private final List cachedPropertiesList; + + // Percentage of minimum reserved connections that can be closed in a cluster, value range: [0,100]. + private volatile int minReservedConPerCluster; + + private volatile boolean enableMinReservedConPerCluster; + + // Percentage of minimum reserved connections that can be closed in a datanode, value range: [0,100]. + private volatile int minReservedConPerDatanode; + + private volatile boolean enableMinReservedConPerDatanode; + + private volatile long quickAutoBalanceStartTime; + + private int totalAbandonedConnectionSize = 0; + + public Cluster(final String urlIdentifier, final Properties properties) throws PSQLException { + this.urlIdentifier = urlIdentifier; + HostSpec[] hostSpecs = Driver.getURLHostSpecs(properties); + this.dns = new HashSet<>(); + this.dns.addAll(Arrays.asList(hostSpecs)); + this.cachedDnList = new ConcurrentHashMap<>(); + for (HostSpec hostSpec : hostSpecs) { + DataNode dataNode = new DataNode(hostSpec); + this.cachedDnList.put(hostSpec, dataNode); + } + updateParams(properties); + this.abandonedConnectionList = new ConcurrentLinkedQueue<>(); + this.cachedPropertiesList = new Vector<>(); + this.cachedPropertiesList.add(properties); + this.quickAutoBalanceStartTime = 0; + this.totalAbandonedConnectionSize = 0; + } + + /** + * set connection state of cached connections if it exists. + * + * @param pgConnection pgConnection + * @param state new state + */ + public void setConnectionState(final PgConnection pgConnection, final StatementCancelState state) { + String socketAddress = pgConnection.getSocketAddress(); + HostSpec hostSpec = calculateHostSpec(socketAddress); + if (hostSpec != null && !dns.contains(hostSpec)) { + return; + } + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode != null) { + dataNode.setConnectionState(pgConnection, state); + } + } + + // calculate the hostSpec of destination + private HostSpec calculateHostSpec(String socketAddress) { + String urlClient = socketAddress.split("/")[1]; + String[] urlClientSplit = urlClient.split(":"); + if (urlClientSplit.length == 2) { + String host = urlClientSplit[0]; + int port = Integer.parseInt(urlClientSplit[1]); + return new HostSpec(host, port); + } else { + return null; + } + + } + + /** + * Add a new connection. + * + * @param pgConnection pgConnection + * @param properties properties + */ + public void setConnection(final PgConnection pgConnection, final Properties properties) + throws PSQLException { + if (pgConnection == null || properties == null) { + return; + } + String socketAddress = pgConnection.getSocketAddress(); + HostSpec hostSpec = calculateHostSpec(socketAddress); + if (hostSpec != null && !dns.contains(hostSpec)) { + return; + } + setProperties(properties); + synchronized (cachedDnList) { + cachedDnList.get(hostSpec).setConnection(pgConnection, properties, hostSpec); + decrementCachedCreatingConnectionSize(hostSpec); + updateParams(properties); + } + } + + /** + * Set new properties if the user doesn't exist in cachedPropertiesList, + * or update properties if exists. + * + * @param properties properties + */ + public void setProperties(Properties properties) { + synchronized (cachedPropertiesList) { + for (int i = 0; i < cachedPropertiesList.size(); i++) { + Properties prop = cachedPropertiesList.get(i); + if (prop.getProperty("user", "").equals(properties.getProperty("user", null))) { + cachedPropertiesList.set(i, properties); + return; + } + } + cachedPropertiesList.add(properties); + } + } + + /** + * CacheCreatingConnectionNum - 1; + * + * @param hostSpec hostSpec + * @return cachedCreatingConnectionNum + */ + public int decrementCachedCreatingConnectionSize(final HostSpec hostSpec) { + if (!cachedDnList.containsKey(hostSpec)) { + LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", hostSpec.toString(), urlIdentifier)); + return 0; + } + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode != null) { + return dataNode.decrementCachedCreatingConnectionSize(); + } else { + LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", hostSpec.toString(), urlIdentifier)); + return 0; + } + } + + private void updateMinReservedConPerCluster(Properties properties) throws PSQLException { + int perCluster = 0; + String param = PGProperty.MIN_RESERVED_CON_PER_CLUSTER.get(properties); + if (param == null) { + return; + } + try { + perCluster = Integer.parseInt(param); + } catch (NumberFormatException e) { + throw new PSQLException(GT.tr("Parameter minReservedConPerCluster={0} parsed failed, value range: int && [0, 100]." + , PGProperty.MIN_RESERVED_CON_PER_CLUSTER.get(properties)), PSQLState.INVALID_PARAMETER_TYPE); + } + if (perCluster < 0 || perCluster > 100) { + throw new PSQLException(GT.tr("Parameter minReservedConPerCluster={0} parsed failed, value range: int && [0, 100]." + , perCluster), PSQLState.INVALID_PARAMETER_VALUE); + } else { + if (!this.enableMinReservedConPerCluster) { + this.enableMinReservedConPerCluster = true; + this.minReservedConPerCluster = perCluster; + } else { + this.minReservedConPerCluster = Math.min(this.minReservedConPerCluster, perCluster); + } + } + } + + private void updateMinReservedConPerDatanode(Properties properties) throws PSQLException { + int perDatanode = 0; + String param = PGProperty.MIN_RESERVED_CON_PER_DATANODE.get(properties); + if (param == null) { + return; + } + try { + perDatanode = Integer.parseInt(param); + } catch (NumberFormatException e) { + throw new PSQLException(GT.tr("Parameter minReservedConPerDatanode={0} parsed failed, value range: int && [0, 100]." + , PGProperty.MIN_RESERVED_CON_PER_DATANODE.get(properties)), PSQLState.INVALID_PARAMETER_TYPE); + } + + if (perDatanode < 0 || perDatanode > 100) { + throw new PSQLException(GT.tr("Parameter minReservedConPerDatanode={0} parsed failed, value range: int && [0, 100].", perDatanode), PSQLState.INVALID_PARAMETER_VALUE); + } else { + if (!this.enableMinReservedConPerDatanode) { + this.enableMinReservedConPerDatanode = true; + this.minReservedConPerDatanode = perDatanode; + } else { + this.minReservedConPerDatanode = Math.min(this.minReservedConPerDatanode, perDatanode); + } + } + } + + private void updateParams(Properties properties) throws PSQLException { + updateMinReservedConPerCluster(properties); + updateMinReservedConPerDatanode(properties); + } + + /** + * Get connection info of the connection if exists. + * + * @param connection connection + * @return connection info + */ + public ConnectionInfo getConnectionInfo(PgConnection connection) { + String socketAddress = connection.getSocketAddress(); + HostSpec hostSpec = calculateHostSpec(socketAddress); + DataNode dataNode = cachedDnList.get(hostSpec); + return hostSpec != null && dataNode != null ? dataNode.getConnectionInfo(connection) : null; + } + + /** + * Sort hostSpec list in ascending order by amount of connections. + * Put the hostSpec to the tail, if dataNodeState = false. + * + * @param hostSpecs host specs + * @return hostSpec list + */ + public List sortDnsByLeastConn(List hostSpecs) { + Map dataNodeCompareInfoMap; + // Copy dataNodeCompareInfo from cachedDnList. + synchronized (cachedDnList) { + dataNodeCompareInfoMap = hostSpecs.stream() + .collect(Collectors.toMap(Function.identity(), hostSpec -> { + DataNode dataNode = cachedDnList.get(hostSpec); + int cachedConnectionListSize = dataNode.getCachedConnectionListSize(); + int cachedCreatingConnectionSize = dataNode.getCachedCreatingConnectionSize(); + boolean dataNodeState = dataNode.getDataNodeState(); + return new DataNodeCompareInfo(cachedConnectionListSize, cachedCreatingConnectionSize, dataNodeState); + })); + } + hostSpecs.sort((o1, o2) -> { + boolean o1State = dataNodeCompareInfoMap.get(o1).getDataNodeState(); + boolean o2State = dataNodeCompareInfoMap.get(o2).getDataNodeState(); + if (!o1State && o2State) { + return 1; + } + if (!o2State && o1State) { + return -1; + } + int o1ConnectionSize = dataNodeCompareInfoMap.get(o1).getConnectionListSize() + dataNodeCompareInfoMap.get(o1).getCachedCreatedConnectionSize(); + int o2ConnectionSize = dataNodeCompareInfoMap.get(o2).getConnectionListSize() + dataNodeCompareInfoMap.get(o2).getCachedCreatedConnectionSize(); + return o1ConnectionSize - o2ConnectionSize; + }); + if (hostSpecs.get(0) != null) { + this.cachedDnList.get(hostSpecs.get(0)).incrementCachedCreatingConnectionSize(); + } + LOGGER.info(GT.tr("SortDnsByLeastConn: {0}." + , dataNodeCompareInfoMap)); + return hostSpecs; + } + + /** + * The necessary info when sorting data nodes. + */ + class DataNodeCompareInfo { + int connectionListSize; + int cachedCreatedConnectionSize; + boolean dataNodeState; + + public DataNodeCompareInfo(final int connectionListSize, final int cachedCreatedConnectionSize, final boolean dataNodeState) { + this.connectionListSize = connectionListSize; + this.cachedCreatedConnectionSize = cachedCreatedConnectionSize; + this.dataNodeState = dataNodeState; + } + + /** + * Get connectionList size. + * + * @return size of connectionList + */ + public int getConnectionListSize() { + return connectionListSize; + } + + /** + * Get cached created connection size. + * + * @return cachedCreateConnectionSize + */ + public int getCachedCreatedConnectionSize() { + return cachedCreatedConnectionSize; + } + + /** + * Get data node state. + * + * @return dataNodeState + */ + public boolean getDataNodeState() { + return dataNodeState; + } + + @Override + public String toString() { + return "{" + + "connectionListSize=" + connectionListSize + + ", cachedCreatedConnectionSize=" + cachedCreatedConnectionSize + + ", dataNodeState=" + dataNodeState + + '}'; + } + } + + /** + * Check each data nodes' validity of cluster, + * use cached properties to tryConnect to each data nodes, + * if cached properties is invalid, remove cached properties, + * if state change to valid from invalid, quick load balancing start, + * if state change to invalid from valid, clear cached connections of this node. + * + * @return the number of invalid data nodes + */ + public int checkClusterState() { + // Count the state of all data nodes before and after checking cluster state. + Map oldStates = cachedDnList.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, + (val) -> val.getValue().getDataNodeState())); + Map newStates = cachedDnList.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, + (val) -> this.checkDnState(val.getKey()))); + Map> checkResult = new HashMap<>(); + for (DataNodeChangedState dataNodeChangedState : DataNodeChangedState.values()) { + checkResult.put(dataNodeChangedState, new ArrayList<>()); + } + for (Map.Entry entry : cachedDnList.entrySet()) { + HostSpec hostSpec = entry.getKey(); + boolean oldState = oldStates.get(hostSpec); + boolean newState = newStates.get(hostSpec); + if (oldState && !newState) { + // If data node fails, clear its cacheConnectionList. + int removed = cachedDnList.get(hostSpec).clearCachedConnections(); + checkResult.get(DataNodeChangedState.CHANGE_TO_INVALID).add(hostSpec); + LOGGER.info(GT.tr("A data node failed, clear cached connections, cluster: {0}, " + + "hostSpec: {1}, cached connections: {2}.", + urlIdentifier, hostSpec.toString(), removed)); + } else if (!oldState && newState) { + checkResult.get(DataNodeChangedState.CHANGE_TO_VALID).add(hostSpec); + } else if (oldState) { + checkResult.get(DataNodeChangedState.KEEP_VALID).add(hostSpec); + } else { + checkResult.get(DataNodeChangedState.KEEP_INVALID).add(hostSpec); + } + } + LOGGER.info(GT.tr("Check cluster states in cluster: {0}, result: {1}.", + urlIdentifier, checkResult.toString())); + // Start to quickLoadBalance. + if (checkResult.get(DataNodeChangedState.CHANGE_TO_VALID).size() != 0 + && LoadBalanceHeartBeating.isQuickAutoBalanceStarted()) { + quickLoadBalance(checkResult.get(DataNodeChangedState.KEEP_VALID)); + } + return checkResult.get(DataNodeChangedState.KEEP_INVALID).size() + + checkResult.get(DataNodeChangedState.CHANGE_TO_INVALID).size(); + } + + enum DataNodeChangedState { + KEEP_VALID, KEEP_INVALID, CHANGE_TO_VALID, CHANGE_TO_INVALID + } + + /** + * Check dn state by cached properties, and remove invalid properties. + * + * @param hostSpec hostSpec + * @return state of the date node + */ + public boolean checkDnState(HostSpec hostSpec) { + synchronized (cachedPropertiesList) { + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode == null) { + return false; + } + for (Iterator iterator = cachedPropertiesList.iterator(); iterator.hasNext(); ) { + Properties properties = iterator.next(); + CheckDnStateResult result = dataNode.checkDnStateAndProperties(properties); + if (CheckDnStateResult.DN_VALID.equals(result)) { + dataNode.setDataNodeState(true); + return true; + } else if (CheckDnStateResult.DN_INVALID.equals(result)) { + dataNode.setDataNodeState(false); + return false; + } else { + iterator.remove(); + } + } + dataNode.setDataNodeState(false); + return false; + } + } + + private int quickLoadBalance(List validDns) { + synchronized (abandonedConnectionList) { + this.quickAutoBalanceStartTime = System.currentTimeMillis(); + // the connections added into abandonedConnectionList + int removed = 0; + // cachedConnectionList size + int total = 0; + // idle connection filtered from cachedConnectionList + int idle = 0; + int minReservedConnectionPercentage; + if (!enableMinReservedConPerCluster) { + minReservedConnectionPercentage = minReservedConPerDatanode; + } else if (!enableMinReservedConPerDatanode) { + minReservedConnectionPercentage = minReservedConPerCluster; + } else { + minReservedConnectionPercentage = Math.max(minReservedConPerDatanode, minReservedConPerCluster); + } + for (Entry entry : cachedDnList.entrySet()) { + DataNode dataNode = entry.getValue(); + if (dataNode != null) { + total += dataNode.getCachedConnectionListSize(); + } + } + // Start to quickLoadBalance. + HashSet removedConnectionList = new HashSet<>(); + for (HostSpec hostSpec : validDns) { + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode != null) { + List idleConnections = dataNode.filterIdleConnections(quickAutoBalanceStartTime); + idle += idleConnections.size(); + int removedConnectionsSize = (int) (idleConnections.size() * (((double) (100 - minReservedConnectionPercentage)) / 100.0)); + for (int i = 0; i < removedConnectionsSize; i++) { + removedConnectionList.add(idleConnections.get(i)); + removed++; + } + } + } + this.abandonedConnectionList.clear(); + this.abandonedConnectionList.addAll(removedConnectionList); + this.totalAbandonedConnectionSize = abandonedConnectionList.size(); + LOGGER.info(GT.tr("QuickLoadBalancing executes in cluster: {0}, " + + "put {1} idle connections into abandonedConnectionList, connections can be closed: {2}, " + + "total connection: {3}, minReservedConPerCluster: {4}, minReservedConPerDatanode: {5}.", + urlIdentifier, removed, idle, total, this.minReservedConPerCluster, this.minReservedConPerDatanode)); + return removed; + } + } + + /** + * Check cached connections validity, and remove invalid connections. + * + * @return the amount of removed connections of each dn. + */ + public List checkConnectionsValidity() { + List ans = new ArrayList<>(); + for (Entry entry : cachedDnList.entrySet()) { + DataNode dataNode = entry.getValue(); + ans.add(dataNode.checkConnectionsValidity()); + } + return ans; + } + + /** + * CachedCreatingConnection + 1 + * + * @param hostSpec hostSpec + * @return cachedCreatingConnection + */ + public int incrementCachedCreatingConnectionSize(final HostSpec hostSpec) { + if (!cachedDnList.containsKey(hostSpec)) { + LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", + hostSpec.toString(), urlIdentifier)); + return 0; + } + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode != null) { + return dataNode.incrementCachedCreatingConnectionSize(); + } else { + LOGGER.error(GT.tr("Can not find hostSpec: {0} in Cluster: {1}.", + hostSpec.toString(), urlIdentifier)); + return 0; + } + } + + /** + * Get minReservedConPerCluster. + * + * @return minReservedConPerCluster + */ + public int getMinReservedConPerCluster() { + return minReservedConPerCluster; + } + + /** + * Get enableMinReservedConPerCluster. + * + * @return enableMinReservedConPerCluster + */ + public boolean isEnableMinReservedConPerCluster() { + return enableMinReservedConPerCluster; + } + + /** + * Get minReservedConPerDatanode. + * + * @return minReservedConPerDatanode + */ + public int getMinReservedConPerDatanode() { + return minReservedConPerDatanode; + } + + /** + * Get enableMInReservedConPerDatanode. + * + * @return enableMInReservedConPerDatanode + */ + public boolean isEnableMinReservedConPerDatanode() { + return enableMinReservedConPerDatanode; + } + + @Override + public int hashCode() { + return Objects.hash(urlIdentifier, dns, abandonedConnectionList, cachedDnList, cachedPropertiesList); + } + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + final Cluster that = (Cluster) o; + return Objects.equals(urlIdentifier, that.urlIdentifier) && Objects.equals(dns, that.dns); + } + + /** + * Close connections from abandonedConnectionList. + * Jdbc will continue to pop connections from abandonedConnectionList and determine whether each connection can be closed. + * If it does, the connection will be close. If it does not, the connection won't be put back into abandonedConnectionList. + * The number of connections closed at each cluster for each scheduled task at most: 20% * totalAbandonedConnectionSize. + * + * @return the number of connections which are closed + */ + public int closeConnections() { + int closed = 0; + double ceilError = 0.001; + int atMost = (int) (Math.ceil(CLOSE_CONNECTION_PERCENTAGE_EACH_TIME * totalAbandonedConnectionSize) + + ceilError); + synchronized (abandonedConnectionList) { + if (abandonedConnectionList.isEmpty()) { + return closed; + } + int oldSize = abandonedConnectionList.size(); + while (!abandonedConnectionList.isEmpty() && closed < atMost) { + ConnectionInfo connectionInfo = abandonedConnectionList.poll(); + HostSpec hostSpec = connectionInfo.getHostSpec(); + // The connections shouldn't be null. + if (hostSpec == null) { + continue; + } + // The connections should be valid. + if (!connectionInfo.checkConnectionIsValid()) { + continue; + } + // The state of connection may change after put into abandonedConnectionList, + // so it's necessary to recheck it which can be closed. + if (!connectionInfo.checkConnectionCanBeClosed(quickAutoBalanceStartTime)) { + continue; + } + DataNode dataNode = cachedDnList.get(hostSpec); + if (dataNode == null) { + continue; + } + boolean hasClosed = dataNode.closeConnection(connectionInfo.getPgConnection()); + if (hasClosed) { + closed++; + } + } + if (abandonedConnectionList.isEmpty()) { + this.quickAutoBalanceStartTime = 0; + this.totalAbandonedConnectionSize = 0; + } + LOGGER.info(GT.tr("Close connections execute in cluster: {0}, closed connections: {1}, " + + "size of abandonedConnectionList before closing: {2}," + + " size of abandonedConnectionList after closing: {3}.", + urlIdentifier, closed, oldSize, abandonedConnectionList.size())); + } + return closed; + } +} diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionInfo.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionInfo.java new file mode 100644 index 0000000..cecd7b2 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionInfo.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.PGProperty; +import org.postgresql.core.QueryExecutor; +import org.postgresql.core.SetupQueryRunner; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Objects; +import java.util.Properties; + +/** + * Connection info used in quick auto balance. + */ +public class ConnectionInfo { + /** + * Default maxIdleTimeBeforeTerminal. + */ + public static final long DEFAULT_MAX_IDLE_TIME_BEFORE_TERMINAL = 30L; + + public static final String ENABLE_QUICK_AUTO_BALANCE_PARAMS = "true"; + + private static Log LOGGER = Logger.getLogger(ConnectionInfo.class.getName()); + + private final PgConnection pgConnection; + + private final long createTimeStamp; + + private final String autoBalance; + + private boolean enableQuickAutoBalance; + + // max idle time of connection, units: second + private long maxIdleTimeBeforeTerminal; + + private final HostSpec hostSpec; + + private volatile StatementCancelState connectionState; + + // the timestamp when state change last time + private volatile long stateLastChangedTimeStamp; + + @Override + public int hashCode() { + return Objects.hash(pgConnection, createTimeStamp, autoBalance, enableQuickAutoBalance, + maxIdleTimeBeforeTerminal, hostSpec); + } + + public ConnectionInfo(PgConnection pgConnection, Properties properties, HostSpec hostSpec) + throws PSQLException { + this.pgConnection = pgConnection; + this.connectionState = StatementCancelState.IDLE; + this.createTimeStamp = System.currentTimeMillis(); + this.stateLastChangedTimeStamp = createTimeStamp; + this.autoBalance = properties.getProperty("autoBalance", ""); + this.maxIdleTimeBeforeTerminal = DEFAULT_MAX_IDLE_TIME_BEFORE_TERMINAL; + this.hostSpec = hostSpec; + parseEnableQuickAutoBalance(properties); + parseMaxIdleTimeBeforeTerminal(properties); + } + + private void parseEnableQuickAutoBalance(Properties properties) throws PSQLException { + if (EnableQuickAutoBalanceParams.TRUE.getValue() + .equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) { + this.enableQuickAutoBalance = true; + } else if (EnableQuickAutoBalanceParams.FALSE.getValue() + .equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) { + this.enableQuickAutoBalance = false; + } else { + throw new PSQLException( + GT.tr("Parameter enableQuickAutoBalance={0} parsed failed, value range: '{true, false'}).", + PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties)), PSQLState.INVALID_PARAMETER_VALUE); + } + } + + private void parseMaxIdleTimeBeforeTerminal(Properties properties) throws PSQLException { + long inputMaxIdleTime; + try { + String param = PGProperty.MAX_IDLE_TIME_BEFORE_TERMINAL.get(properties); + inputMaxIdleTime = Long.parseLong(param); + if (inputMaxIdleTime >= Long.MAX_VALUE / 1000) { + throw new PSQLException( + GT.tr("Parameter maxIdleTimeBeforeTerminal={0} can not be bigger than {1}, value range: long & [0,{1}).", + inputMaxIdleTime, Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_VALUE); + } + if (inputMaxIdleTime < 0) { + throw new PSQLException( + GT.tr("Parameter maxIdleTimeBeforeTerminal={0} can not be less than 0, value range: long & [0,{1}).", + inputMaxIdleTime, Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_VALUE); + + } + } catch (NumberFormatException e) { + throw new PSQLException( + GT.tr("Parameter maxIdleTimeBeforeTerminal parsed failed, value range: long & [0,{0}).", + Long.MAX_VALUE / 1000), PSQLState.INVALID_PARAMETER_TYPE); + } + this.maxIdleTimeBeforeTerminal = inputMaxIdleTime; + } + + enum EnableQuickAutoBalanceParams { + TRUE("true"), + FALSE("false"); + + private final String value; + + EnableQuickAutoBalanceParams(String value) { + this.value = value; + } + + /** + * Get value. + * + * @return value + */ + public String getValue() { + return this.value; + } + } + + public StatementCancelState getConnectionState() { + return connectionState; + } + + public synchronized void setConnectionState(StatementCancelState state) { + if (state != null && !connectionState.equals(state)) { + connectionState = state; + stateLastChangedTimeStamp = System.currentTimeMillis(); + } + } + + public long getMaxIdleTimeBeforeTerminal() { + return maxIdleTimeBeforeTerminal; + } + + public String getAutoBalance() { + return autoBalance; + } + + public PgConnection getPgConnection() { + return pgConnection; + } + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + final ConnectionInfo that = (ConnectionInfo) o; + return createTimeStamp == that.createTimeStamp && enableQuickAutoBalance == that.enableQuickAutoBalance && + maxIdleTimeBeforeTerminal == that.maxIdleTimeBeforeTerminal && pgConnection.equals(that.pgConnection) && + autoBalance.equals(that.autoBalance) && hostSpec.equals(that.hostSpec); + } + + /** + * Check whether the connection can be closed. + * The judgement conditions are as follows: + * 1. The connection enables quickAutoBalance. + * 2. The quickAutoBalance start time is later than the connection create time. + * 3. The connection state is idle. + * 4. The connection keeps idle at least maxIdleTimeBeforeTerminal seconds. + * + * @param quickAutoBalanceStartTime quickAutoBalance start time + * @return whether the connection can be closed + */ + public synchronized boolean checkConnectionCanBeClosed(long quickAutoBalanceStartTime) { + if (pgConnection == null) { + return false; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(GT.tr("checkConnectionCanBeClosed: server ip={0}, enableQuickAutoBalance={1}, " + + "quickAutoBalanceStartTime={2}, createTimeStamp={3}, connectionState={4}, " + + "stateLastChangedTimeStamp={5}, currentTimeMillis={6}", + hostSpec.toString(), isEnableQuickAutoBalance(), quickAutoBalanceStartTime, createTimeStamp, + connectionState, stateLastChangedTimeStamp, System.currentTimeMillis())); + } + if (!isEnableQuickAutoBalance()) { + return false; + } + if (quickAutoBalanceStartTime < createTimeStamp) { + return false; + } + if (!connectionState.equals(StatementCancelState.IDLE)) { + return false; + } + return System.currentTimeMillis() - stateLastChangedTimeStamp > maxIdleTimeBeforeTerminal * 1000; + } + + public boolean isEnableQuickAutoBalance() { + return enableQuickAutoBalance; + } + + /** + * Check whether a connection is valid. + * + * @return whether a connection is valid + */ + public boolean checkConnectionIsValid() { + boolean isConnectionValid; + try { + QueryExecutor queryExecutor = pgConnection.getQueryExecutor(); + byte[][] bit = SetupQueryRunner.run(queryExecutor, "select 1", true); + if (bit == null) { + return false; + } + String result = queryExecutor.getEncoding().decode(bit[0]); + isConnectionValid = result != null && result.equals("1"); + } catch (SQLException | IOException e) { + LOGGER.info(GT.tr("Check connection isValid failed.")); + isConnectionValid = false; + } + return isConnectionValid; + } + + /** + * get hostSpec + * + * @return hostSpec + */ + public HostSpec getHostSpec() { + return hostSpec; + } +} diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionManager.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionManager.java new file mode 100644 index 0000000..ce7953b --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ConnectionManager.java @@ -0,0 +1,295 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.Driver; +import org.postgresql.QueryCNListUtils; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Connection manager of quick load balancing. + */ +public class ConnectionManager { + private static final Log LOGGER = Logger.getLogger(ConnectionManager.class.getName()); + + private static final String AUTO_BALANCE = "autoBalance"; + + private static final String LEAST_CONN = "leastconn"; + + private final Map cachedClusters; + + private ConnectionManager() { + this.cachedClusters = new ConcurrentHashMap<>(); + } + + /** + * Choose abandoned connections from abandonedConnections of each cluster. + * The number of connections that each cluster closes : CLOSE_CONNECTION_PERIOD * CLOSE_CONNECTION_PER_SECOND. + * + * @return the number of connections closed per cluster + */ + public List closeConnections() { + List ans = new ArrayList<>(); + for (Entry entry : cachedClusters.entrySet()) { + Cluster cluster = entry.getValue(); + int num = 0; + num += cluster != null ? cluster.closeConnections() : 0; + ans.add(num); + } + return ans; + } + + /** + * check whether the properties enable leastconn. + * + * @param properties properties + * @return whether the properties enable leastconn. + */ + public static boolean checkEnableLeastConn(Properties properties) { + if (properties == null) { + return false; + } + if (!LEAST_CONN.equals(properties.getProperty(AUTO_BALANCE, ""))) { + return false; + } + HostSpec[] hostSpecs = Driver.getURLHostSpecs(properties); + if (hostSpecs.length <= 1) { + return false; + } + return true; + } + + private static class Holder { + private static final ConnectionManager INSTANCE = new ConnectionManager(); + } + + public static ConnectionManager getInstance() { + return Holder.INSTANCE; + } + + /** + * Set cluster into connection manager. + * + * @param properties properties + * @return set or not. + */ + public boolean setCluster(Properties properties) throws PSQLException { + if (!checkEnableLeastConn(properties)) { + return false; + } + String urlIdentifier = QueryCNListUtils.keyFromURL(properties); + // create a cluster if it doesn't exist in cachedClusters. + if (!cachedClusters.containsKey(urlIdentifier)) { + synchronized (cachedClusters) { + if (!cachedClusters.containsKey(urlIdentifier)) { + Cluster cluster = new Cluster(urlIdentifier, properties); + cachedClusters.put(urlIdentifier, cluster); + return true; + } else { + return false; + } + } + } else { + return false; + } + } + + /** + * Cache this connection, if it's configured with autoBalance = "leastconn". + * + * @param pgConnection connection + * @param properties properties + * @return if insert the connection into connectionManager. + */ + public boolean setConnection(PgConnection pgConnection, Properties properties) throws PSQLException { + if (!checkEnableLeastConn(properties)) { + return false; + } + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + // create a cluster if it doesn't exist in cachedClusters. + if (!cachedClusters.containsKey(URLIdentifier)) { + synchronized (cachedClusters) { + if (!cachedClusters.containsKey(URLIdentifier)) { + Cluster cluster = new Cluster(URLIdentifier, properties); + cluster.setConnection(pgConnection, properties); + cachedClusters.put(URLIdentifier, cluster); + } + } + } else { + cachedClusters.get(URLIdentifier).setConnection(pgConnection, properties); + } + return true; + } + + /** + * Set connection state of cached connections if it exists. + * + * @param pgConnection pgConnection + * @param state state + */ + public void setConnectionState(PgConnection pgConnection, StatementCancelState state) throws PSQLException { + String url; + try { + url = pgConnection.getURL(); + } catch (SQLException e) { + LOGGER.error(GT.tr("Can't get url from pgConnection.")); + return; + } + String URLIdentifier = getURLIdentifierFromUrl(url); + Cluster cluster = cachedClusters.get(URLIdentifier); + if (cluster != null) { + cluster.setConnectionState(pgConnection, state); + } + } + + public Cluster getCluster(String URLIdentifier) { + return cachedClusters.get(URLIdentifier); + } + + /** + * Get URLIdentifier from url, which is a unique id of cluster. + * + * @param url url + * @return URLIdentifier + */ + public static String getURLIdentifierFromUrl(String url) throws PSQLException { + HostSpec[] hostSpecs; + try { + String pgHostUrl = url.split("//")[1].split("/")[0]; + String[] pgHosts = pgHostUrl.split(","); + hostSpecs = new HostSpec[pgHosts.length]; + for (int i = 0; i < hostSpecs.length; i++) { + hostSpecs[i] = new HostSpec(pgHosts[i].split(":")[0], + Integer.parseInt(pgHosts[i].split(":")[1])); + } + Arrays.sort(hostSpecs); + + } catch (ArrayIndexOutOfBoundsException e) { + throw new PSQLException( + GT.tr("Parsed url={0} failed.", url), PSQLState.INVALID_PARAMETER_VALUE); + } + return Arrays.toString(hostSpecs); + } + + /** + * Check whether the connections are valid in each cluster, and remove invalid connections. + * + * @return the number of connections removed from cache. + */ + public List checkConnectionsValidity() { + List ans = new ArrayList<>(); + for (Entry entry : cachedClusters.entrySet()) { + Cluster cluster = entry.getValue(); + int num = 0; + if (cluster != null) { + List removes = cluster.checkConnectionsValidity(); + for (int remove : removes) { + num += remove; + } + } + ans.add(num); + } + return ans; + } + + /** + * Check datanode states of each cluster. + * + * @return the number of invalid data nodes of all cluster + */ + public int checkClusterStates() { + int invalidDataNodes = 0; + for (Entry entry : cachedClusters.entrySet()) { + Cluster cluster = entry.getValue(); + if (cluster != null) { + invalidDataNodes += cluster.checkClusterState(); + } + } + return invalidDataNodes; + } + + /** + * Increment cachedCreatingConnectionSize. + * CachedCreatingConnectionSize indicates the number of connections that have been load balanced, + * but haven't been cached into cachedConnectionList yet. + * + * @param hostSpec hostSpec + * @param properties properties + * @return cachedCreatingConnectionSize after updating + */ + public int incrementCachedCreatingConnectionSize(HostSpec hostSpec, Properties properties) { + if (!checkEnableLeastConn(properties)) { + return 0; + } + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + if (cachedClusters.containsKey(URLIdentifier)) { + Cluster cluster = cachedClusters.get(URLIdentifier); + if (cluster != null) { + return cluster.incrementCachedCreatingConnectionSize(hostSpec); + } + } else { + LOGGER.info(GT.tr("Can not find cluster: {0} in cached clusters.", URLIdentifier)); + } + return 0; + } + + /** + * Decrement cachedCreatingConnectionSize. + * CachedCreatingConnectionSize indicates the number of connections that have been load balanced, + * but haven't been cached into cachedConnectionList yet. + * + * @param hostSpec hostSpec + * @param properties properties + * @return cachedCreatingConnectionSize after updating + */ + public int decrementCachedCreatingConnectionSize(HostSpec hostSpec, Properties properties) { + if (!checkEnableLeastConn(properties)) { + return 0; + } + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + if (cachedClusters.containsKey(URLIdentifier)) { + Cluster cluster = cachedClusters.get(URLIdentifier); + if (cluster != null) { + return cluster.decrementCachedCreatingConnectionSize(hostSpec); + } + } else { + LOGGER.info(GT.tr("Can not find cluster: {0} in cached clusters.", URLIdentifier)); + } + return 0; + } + + public void clear() { + synchronized (cachedClusters) { + cachedClusters.clear(); + } + } +} \ No newline at end of file diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/DataNode.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/DataNode.java new file mode 100644 index 0000000..7fec380 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/DataNode.java @@ -0,0 +1,361 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.PGProperty; +import org.postgresql.core.PGStream; +import org.postgresql.core.QueryExecutor; +import org.postgresql.core.SocketFactoryFactory; +import org.postgresql.core.v3.ConnectionFactoryImpl; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.SslMode; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import javax.net.SocketFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Datanode. + */ +public class DataNode { + private static Log LOGGER = Logger.getLogger(DataNode.class.getName()); + + private static final String USERNAME_OR_PASSWORD_INVALID_ERROR_CODE = "28P01"; + + // the host of datanode (ip + port) + private final HostSpec hostSpec; + + // cached connections + private final Map cachedConnectionList; + + // number of cached connections, before set into cachedConnectionList, and after load balance by leastconn. + private final AtomicInteger cachedCreatingConnectionSize; + + private volatile boolean dataNodeState; + + public DataNode(final HostSpec hostSpec) { + this.hostSpec = hostSpec; + this.cachedConnectionList = new ConcurrentHashMap<>(); + this.cachedCreatingConnectionSize = new AtomicInteger(0); + this.dataNodeState = true; + } + + /** + * Set connection state. + * + * @param pgConnection pgConnection + * @param state state + */ + public void setConnectionState(final PgConnection pgConnection, final StatementCancelState state) { + ConnectionInfo connectionInfo = cachedConnectionList.get(pgConnection); + if (connectionInfo != null) { + connectionInfo.setConnectionState(state); + } + } + + /** + * Set connection. + * + * @param pgConnection pgConnection + * @param properties properties + * @param hostSpec hostSpec + */ + public void setConnection(final PgConnection pgConnection, final Properties properties, final HostSpec hostSpec) + throws PSQLException { + if (pgConnection == null || properties == null || hostSpec == null) { + return; + } + if (!hostSpec.equals(this.hostSpec)) { + return; + } + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + cachedConnectionList.put(pgConnection, connectionInfo); + } + + /** + * Get connection info if the connection exits. + * + * @param pgConnection pgConnection + * @return connectionInfo + */ + public ConnectionInfo getConnectionInfo(PgConnection pgConnection) { + if (pgConnection == null) { + return null; + } + return cachedConnectionList.get(pgConnection); + } + + /** + * Get the size of cachedConnectionList. + * + * @return size of cachedConnectionList + */ + public int getCachedConnectionListSize() { + return cachedConnectionList.size(); + } + + /** + * Check dn state and the validity of properties. + * + * @param properties properties + * @return result (dnValid, dnInvalid, propertiesInvalid) + */ + public CheckDnStateResult checkDnStateAndProperties(Properties properties) { + boolean isDataNodeValid; + Properties singleNodeProperties = new Properties(); + PGProperty.USER.set(singleNodeProperties, PGProperty.USER.get(properties)); + PGProperty.PASSWORD.set(singleNodeProperties, PGProperty.PASSWORD.get(properties)); + PGProperty.PG_DBNAME.set(singleNodeProperties, PGProperty.PG_DBNAME.get(properties)); + PGProperty.PG_HOST.set(singleNodeProperties, hostSpec.getHost()); + PGProperty.PG_PORT.set(singleNodeProperties, hostSpec.getPort()); + try { + isDataNodeValid = checkDnState(singleNodeProperties); + } catch (PSQLException e) { + String cause = e.getCause() != null ? e.getCause().getMessage() : ""; + LOGGER.info(GT.tr("Can not try connect to dn: {0}, {1}.", hostSpec.toString(), cause.toString())); + return CheckDnStateResult.DN_INVALID; + } catch (InvocationTargetException e) { + Throwable invocationTargetExceptionCause = e.getCause(); + if (invocationTargetExceptionCause instanceof PSQLException) { + PSQLException psqlException = (PSQLException) invocationTargetExceptionCause; + String sqlState = psqlException.getSQLState(); + if (USERNAME_OR_PASSWORD_INVALID_ERROR_CODE.equals(sqlState)) { + String cause = e.getCause() != null ? e.getCause().getMessage() : ""; + LOGGER.info(GT.tr("Cached properties is invalid: {0}.", cause.toString())); + return CheckDnStateResult.PROPERTIES_INVALID; + } + } + String cause = e.getCause() != null ? e.getCause().getMessage() : ""; + LOGGER.info(GT.tr("Can not try connect to dn: {0}, {1}.", hostSpec.toString(), cause.toString())); + return CheckDnStateResult.DN_INVALID; + } + if (isDataNodeValid) { + return CheckDnStateResult.DN_VALID; + } else { + return CheckDnStateResult.DN_INVALID; + } + } + + /** + * Filter idle connections from cachedConnectionsList. + * + * @param quickAutoBalanceStartTime the time since the start of quickAutoBalance + * @return idle Connection list + */ + public List filterIdleConnections(final long quickAutoBalanceStartTime) { + synchronized (cachedConnectionList) { + List idleConnectionList = new ArrayList<>(); + for (Entry entry : cachedConnectionList.entrySet()) { + ConnectionInfo connectionInfo = entry.getValue(); + if (connectionInfo != null && connectionInfo.checkConnectionCanBeClosed(quickAutoBalanceStartTime)) { + idleConnectionList.add(connectionInfo); + } + } + return idleConnectionList; + } + } + + /** + * The result of checking dn state. + */ + public enum CheckDnStateResult { + DN_VALID, + DN_INVALID, + PROPERTIES_INVALID + } + + public void setDataNodeState(boolean isDnValid) { + this.dataNodeState = isDnValid; + } + + public boolean getDataNodeState() { + return this.dataNodeState; + } + + /** + * check a dn of the cluster if valid, + * + * @param properties properties + * @return if the dn is valid. + * @throws PSQLException psql exception + * @throws InvocationTargetException invocation target exception + */ + public boolean checkDnState(Properties properties) throws PSQLException, InvocationTargetException { + Object pgStream = new Object(); + try { + HostSpec dnHostSpec = new HostSpec(properties.getProperty("PGHOST") + , Integer.parseInt(properties.getProperty("PGPORT"))); + SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(properties); + SslMode sslMode = SslMode.of(properties); + Class classForName = Class.forName("org.postgresql.core.v3.ConnectionFactoryImpl"); + Object object = classForName.newInstance(); + if (!(object instanceof ConnectionFactoryImpl)) { + LOGGER.error(GT.tr("classForName.newInstance() doesn't instanceof ConnectionFactoryImpl.")); + return false; + } + ConnectionFactoryImpl connectionFactory = (ConnectionFactoryImpl) object; + Method method = connectionFactory.getClass().getDeclaredMethod("tryConnect", String.class, + String.class, Properties.class, SocketFactory.class, HostSpec.class, SslMode.class); + method.setAccessible(true); + pgStream = method.invoke(connectionFactory, properties.getProperty("user"), + properties.getProperty("PGDBNAME"), properties, socketFactory, dnHostSpec, sslMode); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | + ClassNotFoundException e) { + throw new PSQLException("The queryExecutor of connection can't execute tryConnect", + PSQLState.WRONG_OBJECT_TYPE); + } + + if (pgStream instanceof PGStream) { + return true; + } else { + LOGGER.error(GT.tr("Stream doesn't instanceof PGStream.")); + return false; + } + } + + + /** + * Check cached connections validity, and remove invalid connections. + * + * @return the amount of removed connections. + */ + public int checkConnectionsValidity() { + int num = 0; + for (Entry entry : cachedConnectionList.entrySet()) { + PgConnection pgConnection = entry.getKey(); + ConnectionInfo connectionInfo = entry.getValue(); + if (!connectionInfo.checkConnectionIsValid()) { + cachedConnectionList.remove(pgConnection); + num++; + } + } + return num; + } + + /** + * Close cached connections, and clear cachedConnectionList. + * JDBC execute clearCachedConnections when jdbc find an invalid datanode. + * + * @return size of cachedConnectionList before cleared + */ + public int clearCachedConnections() { + synchronized (cachedConnectionList) { + int num = cachedConnectionList.size(); + for (Map.Entry entry : cachedConnectionList.entrySet()) { + PgConnection pgConnection = entry.getKey(); + if (pgConnection != null) { + QueryExecutor queryExecutor = pgConnection.getQueryExecutor(); + if (queryExecutor != null && !queryExecutor.isClosed()) { + queryExecutor.setAvailability(false); + } + } else { + LOGGER.error(GT.tr("Fail to close connection, pgConnection = null.")); + } + } + cachedConnectionList.clear(); + return num; + } + } + + /** + * Close connection. + * + * @param pgConnection pgConnection + * @return if closed + */ + public boolean closeConnection(PgConnection pgConnection) { + if (pgConnection == null) { + return false; + } + ConnectionInfo connectionInfo = cachedConnectionList.remove(pgConnection); + if (connectionInfo != null) { + try { + pgConnection.close(); + return true; + } catch (SQLException e) { + LOGGER.info(GT.tr("Connection closed failed."), e); + return false; + } + } + return false; + } + + /** + * get cachedCreatingConnectionSize + * + * @return cachedCreatingConnectionSize + */ + public int getCachedCreatingConnectionSize() { + return cachedCreatingConnectionSize.get(); + } + + /** + * increment cachedCreatingConnectionSize + * + * @return cachedCreatingConnectionSize after updated + */ + public int incrementCachedCreatingConnectionSize() { + return cachedCreatingConnectionSize.incrementAndGet(); + } + + /** + * decrement cachedCreatingConnectionSize + * + * @return cachedCreatingConnectionSize after updated + */ + public int decrementCachedCreatingConnectionSize() { + if (cachedCreatingConnectionSize.get() == 0) { + // Some of junit tests don't load balance, but setConnection, can generate this error. + LOGGER.error(GT.tr("CachedCreatingConnectionSize should not be less than 0, reset to 0.")); + return 0; + } + return cachedCreatingConnectionSize.decrementAndGet(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final DataNode dataNode = (DataNode) obj; + return dataNodeState == dataNode.dataNodeState && Objects.equals(hostSpec, dataNode.hostSpec) && + Objects.equals(cachedConnectionList, dataNode.cachedConnectionList); + } + + @Override + public int hashCode() { + return Objects.hash(hostSpec); + } +} diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/LoadBalanceHeartBeating.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/LoadBalanceHeartBeating.java new file mode 100644 index 0000000..fbdb3c3 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/LoadBalanceHeartBeating.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.PGProperty; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.util.GT; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Load balance heartBeating. + */ +public class LoadBalanceHeartBeating { + + private static final int INITIAL_DELAY = 1000; + + private static final int CHECK_CLUSTER_STATE_PERIOD = 1000 * 20; + + // unit: CLOSE_CONNECTION_PER_SECOND + private static final int CLOSE_CONNECTION_PERIOD = 1000 * 5; + + // A heartBeating thread used to check clusters' state. + // If user has configured 'autoBalance=leastconn', jdbc will start checkConnectionScheduledExecutorService. + private static final ScheduledExecutorService checkClusterStateScheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(r -> new Thread(r, "loadBalanceHeartBeatingThread")); + + // A heartBeating thread used to close abandoned connections. + // If user has configured 'autoBalance=leastconn&enableQuickAutoBalance=true', jdbc will start closeConnectionExecutorService. + private static final ScheduledExecutorService closeConnectionExecutorService = Executors + .newSingleThreadScheduledExecutor(r -> new Thread(r, "closeConnectionsHeartBeatingThread")); + private static Log LOGGER = Logger.getLogger(LoadBalanceHeartBeating.class.getName()); + private static volatile ScheduledFuture checkClusterStateScheduledFuture = null; + + private static volatile ScheduledFuture closeConnectionScheduledFuture = null; + + // If singleton checkConnectionScheduledExecutorService has started. + private static volatile boolean leastConnStarted = false; + + // If singleton closeConnectionExecutorService has started. + private static volatile boolean quickAutoBalanceStarted = false; + + /** + * Whether quickAutoBalance has started. + * + * @return whether quickAutoBalance has started + */ + public static boolean isLoadBalanceHeartBeatingStarted() { + return leastConnStarted && quickAutoBalanceStarted; + } + + /** + * Get if quickAutoBalance has started. + * + * @return if quickAutoBalance has started + */ + public static boolean isQuickAutoBalanceStarted() { + return quickAutoBalanceStarted; + } + + /** + * Get if leastConnStarted has started. + * + * @return if leastConnStarted has started + */ + public static boolean isLeastConnStarted() { + return leastConnStarted; + } + + /** + * Start scheduled executor service. There are two singleton scheduled executor service. + * If user has configured 'autoBalance=leastconn', jdbc will start checkConnectionScheduledExecutorService. + * If user has configured 'autoBalance=leastconn&enableQuickAutoBalance=true', jdbc will start closeConnectionExecutorService. + * + * @param properties properties + */ + public static void startScheduledExecutorService(Properties properties) { + if (!leastConnStarted) { + if (ConnectionManager.checkEnableLeastConn(properties)) { + synchronized (LoadBalanceHeartBeating.class) { + if (!leastConnStarted) { + leastConnStarted = true; + checkClusterStateScheduledFuture = checkClusterStateScheduledExecutorService + .scheduleAtFixedRate(LoadBalanceHeartBeating::checkClusterStateScheduleTask, + INITIAL_DELAY, CHECK_CLUSTER_STATE_PERIOD, TimeUnit.MILLISECONDS); + LOGGER.info(GT.tr("Start scheduleExecutorService, period:{0} milliseconds.", + CHECK_CLUSTER_STATE_PERIOD)); + } + } + } + } + if (!quickAutoBalanceStarted) { + if (ConnectionManager.checkEnableLeastConn(properties) + && ConnectionInfo.ENABLE_QUICK_AUTO_BALANCE_PARAMS.equals(PGProperty.ENABLE_QUICK_AUTO_BALANCE.get(properties))) { + synchronized (LoadBalanceHeartBeating.class) { + if (!quickAutoBalanceStarted) { + quickAutoBalanceStarted = true; + closeConnectionScheduledFuture = closeConnectionExecutorService + .scheduleAtFixedRate(LoadBalanceHeartBeating::closeAbandonedConnections, + INITIAL_DELAY, CLOSE_CONNECTION_PERIOD, TimeUnit.MILLISECONDS); + LOGGER.info(GT.tr("Start closeConnectionScheduledFuture, period:{0} milliseconds.", + CLOSE_CONNECTION_PERIOD)); + } + } + } + } + } + + private static void checkClusterStateScheduleTask() { + checkClusterState(); + checkConnectionValidity(); + } + + private static void closeAbandonedConnections() { + List closedConnections = ConnectionManager.getInstance().closeConnections(); + int sum = closedConnections.stream().mapToInt(Integer::intValue).sum(); + LOGGER.info(GT.tr("Scheduled task: closeAbandonedConnections(), thread id: {0}, " + + "amount of closed connections: {1}.", Thread.currentThread().getId(), sum)); + } + + private static void checkClusterState() { + int invalidDataNodes = ConnectionManager.getInstance().checkClusterStates(); + LOGGER.info(GT.tr("Scheduled task: checkClusterState(), thread id: {0}, " + + "amount of invalid data nodes: {1}.", Thread.currentThread().getId(), invalidDataNodes)); + } + + private static void checkConnectionValidity() { + List removes = ConnectionManager.getInstance().checkConnectionsValidity(); + int sum = removes.stream().mapToInt(Integer::intValue).sum(); + LOGGER.info(GT.tr("Scheduled task: checkConnectionValidity(), thread id: {0}, " + + "amount of removed connections: {1}.", Thread.currentThread().getId(), sum)); + } + + /** + * Stop scheduled executor service. + */ + public static void stop() { + if (checkClusterStateScheduledFuture != null || closeConnectionScheduledFuture != null) { + synchronized (LoadBalanceHeartBeating.class) { + if (checkClusterStateScheduledFuture != null) { + checkClusterStateScheduledFuture.cancel(true); + leastConnStarted = false; + } + if (closeConnectionScheduledFuture != null) { + closeConnectionScheduledFuture.cancel(true); + quickAutoBalanceStarted = false; + } + } + } + } +} diff --git a/pgjdbc/src/main/java/org/postgresql/quickautobalance/ReflectUtil.java b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ReflectUtil.java new file mode 100644 index 0000000..79db410 --- /dev/null +++ b/pgjdbc/src/main/java/org/postgresql/quickautobalance/ReflectUtil.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.quickautobalance; + +import org.postgresql.log.Log; +import org.postgresql.log.Logger; + +import java.lang.reflect.Field; + +/** + * Reflect util + */ +public class ReflectUtil { + private static Log LOGGER = Logger.getLogger(ReflectUtil.class.getName()); + + /** + * Get the private property of an object. + * + * @param classz class of object + * @param object object + * @param t class of the private property + * @param fieldName of the private property + * @param class of the private property + * @return the private property + */ + public static T getField(Class classz, Object object, Class t, String fieldName) { + try { + Field field = classz.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(object); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOGGER.error("get reflect field " + classz + "." + fieldName + " error."); + } + return null; + } + + /** + * Set the private property of an object. + * + * @param classz class of object + * @param object object + * @param fieldName of the private property + * @param value value of the private property + */ + public static void setField(Class classz, Object object, String fieldName, Object value) { + try { + Field field = classz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOGGER.error("set reflect field " + classz + "." + fieldName + " error."); + } + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/test/TestUtil.java b/pgjdbc/src/test/java/org/postgresql/test/TestUtil.java index 01751f9..b459d77 100644 --- a/pgjdbc/src/test/java/org/postgresql/test/TestUtil.java +++ b/pgjdbc/src/test/java/org/postgresql/test/TestUtil.java @@ -108,14 +108,42 @@ public class TestUtil { public static String getServer() { return System.getProperty("server", "localhost"); } - + + /* + * Returns the Secondary Test server + */ + public static String getSecondaryServer() { + return System.getProperty("secondaryServer", "localhost"); + } + + /* + * Returns the third Test server + */ + public static String getSecondaryServer2() { + return System.getProperty("secondaryServer2", "localhost"); + } + /* * Returns the Test port */ public static int getPort() { return Integer.parseInt(System.getProperty("port", System.getProperty("def_pgport"))); } - + + /* + * Returns the Secondary Test port + */ + public static int getSecondaryPort() { + return Integer.parseInt(System.getProperty("secondaryPort", System.getProperty("def_pgport"))); + } + + /* + * Returns the Third Test port + */ + public static int getSecondaryServerPort2() { + return Integer.parseInt(System.getProperty("secondaryServerPort2", System.getProperty("def_pgport"))); + } + /* * Returns the server side prepared statement threshold. */ diff --git a/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ClusterTest.java b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ClusterTest.java new file mode 100644 index 0000000..d56612a --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ClusterTest.java @@ -0,0 +1,851 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.test.quickautobalance; + +import org.junit.Test; +import org.postgresql.QueryCNListUtils; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.quickautobalance.Cluster; +import org.postgresql.quickautobalance.ConnectionInfo; +import org.postgresql.quickautobalance.ConnectionManager; +import org.postgresql.quickautobalance.DataNode; +import org.postgresql.quickautobalance.ReflectUtil; +import org.postgresql.test.TestUtil; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Cluster test + */ +public class ClusterTest { + private static Log LOGGER = Logger.getLogger(ClusterTest.class.getName()); + + private static final String FAKE_HOST = "127.127.217.217"; + + private static final String FAKE_PORT = "1"; + + private static final String FAKE_USER = "fakeuser"; + + private static final String FAKE_PASSWORD = "fakepassword"; + + private static final int DN_NUM = 3; + + private PgConnection getConnection(String url, Properties properties) throws SQLException { + return DriverManager.getConnection(url, properties).unwrap(PgConnection.class); + } + + private HostSpec[] initHostSpecs() { + HostSpec[] hostSpecs = new HostSpec[DN_NUM]; + hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort()); + hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2()); + return hostSpecs; + } + + private HostSpec[] initHostSpecsWithInvalidNode() { + HostSpec[] hostSpecs = new HostSpec[DN_NUM]; + hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort()); + hostSpecs[2] = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT)); + return hostSpecs; + } + + private boolean checkHostSpecs(HostSpec[] hostSpecs) { + if (hostSpecs.length != DN_NUM) { + return false; + } + for (int i = 0; i < DN_NUM; i++) { + if ("localhost".equals(hostSpecs[i].getHost())) { + return false; + } + } + return true; + } + + private Properties initPriority(HostSpec[] hostSpecs) { + String host = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String port = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", host); + properties.setProperty("PGPORT", port); + properties.setProperty("PGPORTURL", port); + properties.setProperty("PGHOST", host); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + return properties; + } + + private String initURL(HostSpec[] hostSpecs) { + String host1 = hostSpecs[0].getHost() + ":" + hostSpecs[0].getPort(); + String host2 = hostSpecs[1].getHost() + ":" + hostSpecs[1].getPort(); + String host3 = hostSpecs[2].getHost() + ":" + hostSpecs[2].getPort(); + return "jdbc:postgresql://" + host1 + "," + host2 + "," + host3 + "/" + TestUtil.getDatabase(); + } + + @Test + public void checkConnectionsValidityTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + int total = 20; + int remove = 5; + List pgConnectionList = new ArrayList<>(); + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + pgConnectionList.add(pgConnection); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + List result = cluster.checkConnectionsValidity(); + int sum = result.stream().reduce(Integer::sum).orElse(0); + assertEquals(0, sum); + + for (int i = 0; i < remove; i++) { + try { + pgConnectionList.get(i).close(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + sum = 0; + result = cluster.checkConnectionsValidity(); + sum = result.stream().reduce(Integer::sum).orElse(0); + assertEquals(remove, sum); + ConnectionManager.getInstance().clear(); + } + + @Test + public void setMinReservedConPerClusterDefaultParamsTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + assertFalse(cluster.isEnableMinReservedConPerCluster()); + assertFalse(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 0); + assertEquals(cluster.getMinReservedConPerDatanode(), 0); + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerClusterParsedFailedTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "asafaas"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState()); + throw e; + } + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerClusterParsedTooSmallTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "-1"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerClusterParsedTooBigTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "200"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerDatanodeParsedFailedTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerDatanode", "asafaas"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState()); + throw e; + } + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerDatanodeParsedTooSmallTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerDatanode", "-1"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test (expected = SQLException.class) + public void setMinReservedConPerDatanodeParsedTooBigTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + String url1 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerDatanode", "200"); + try (PgConnection pgConnection = getConnection(url1, properties)) { + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test + public void updateParamsFailedTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + assertFalse(cluster.isEnableMinReservedConPerCluster()); + assertFalse(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 0); + assertEquals(cluster.getMinReservedConPerDatanode(), 0); + + String url2 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "40"); + properties.setProperty("minReservedConPerDatanode", "50"); + PgConnection pgConnection1 = getConnection(url2, properties); + cluster.setConnection(pgConnection1, properties); + assertTrue(cluster.isEnableMinReservedConPerCluster()); + assertTrue(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 40); + assertEquals(cluster.getMinReservedConPerDatanode(), 50); + pgConnection1.close(); + + String url3 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "60"); + properties.setProperty("minReservedConPerDatanode", "70"); + PgConnection pgConnection2 = getConnection(url3, properties); + cluster.setConnection(pgConnection2, properties); + assertTrue(cluster.isEnableMinReservedConPerCluster()); + assertTrue(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 40); + assertEquals(cluster.getMinReservedConPerDatanode(), 50); + pgConnection2.close(); + ConnectionManager.getInstance().clear(); + } + + @Test + public void setConnectionStateSuccessTest() throws SQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + assertFalse(cluster.isEnableMinReservedConPerCluster()); + assertFalse(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 0); + assertEquals(cluster.getMinReservedConPerDatanode(), 0); + + String url2 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "40"); + properties.setProperty("minReservedConPerDatanode", "50"); + PgConnection pgConnection1 = getConnection(url2, properties); + cluster.setConnection(pgConnection1, properties); + assertTrue(cluster.isEnableMinReservedConPerCluster()); + assertTrue(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 40); + assertEquals(cluster.getMinReservedConPerDatanode(), 50); + pgConnection1.close(); + + String url3 = initURL(hostSpecs) + "?autoBalance=leastconn"; + properties.setProperty("minReservedConPerCluster", "20"); + properties.setProperty("minReservedConPerDatanode", "30"); + PgConnection pgConnection2 = getConnection(url3, properties); + cluster.setConnection(pgConnection2, properties); + assertTrue(cluster.isEnableMinReservedConPerCluster()); + assertTrue(cluster.isEnableMinReservedConPerDatanode()); + assertEquals(cluster.getMinReservedConPerCluster(), 20); + assertEquals(cluster.getMinReservedConPerDatanode(), 30); + pgConnection2.close(); + ConnectionManager.getInstance().clear(); + } + + @Test + public void setConnectionTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + PgConnection pgConnection; + try { + pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + ConnectionInfo connectionInfo = cluster.getConnectionInfo(pgConnection); + assertEquals(pgConnection, connectionInfo.getPgConnection()); + cluster.setConnection(null, properties); + cluster.setConnection(pgConnection, null); + ConnectionManager.getInstance().clear(); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + private void setConnection(Cluster cluster, String ip, int port, Properties properties) { + String url = "jdbc:postgresql://" + ip + ":" + port + "/" + TestUtil.getDatabase(); + try { + final PgConnection pgConnection = getConnection(url, properties); + cluster.incrementCachedCreatingConnectionSize(new HostSpec(ip, port)); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void sortDnsByLeastConnTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + int num1 = 4; + int num2 = 6; + int num3 = 5; + for (int i = 0; i < num1; i++) { + setConnection(cluster, TestUtil.getServer(), TestUtil.getPort(), properties); + } + for (int i = 0; i < num2; i++) { + setConnection(cluster, TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort(), properties); + } + for (int i = 0; i < num3; i++) { + setConnection(cluster, TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2(), properties); + } + List result = cluster.sortDnsByLeastConn(Arrays.asList(hostSpecs)); + LOGGER.info(GT.tr("after sort: {0}", result.toString())); + assertEquals(TestUtil.getServer(), result.get(0).getHost()); + assertEquals(TestUtil.getPort(), result.get(0).getPort()); + assertEquals(TestUtil.getSecondaryServer(), result.get(2).getHost()); + assertEquals(TestUtil.getSecondaryPort(), result.get(2).getPort()); + assertEquals(TestUtil.getSecondaryServer2(), result.get(1).getHost()); + assertEquals(TestUtil.getSecondaryServerPort2(), result.get(1).getPort()); + ConnectionManager.getInstance().clear(); + } + + @Test + public void sortDnsByLeastConnWithOneNodeFailedTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + + int num1 = 4; + int num2 = 6; + int num3 = 5; + for (int i = 0; i < num1; i++) { + setConnection(cluster, TestUtil.getServer(), TestUtil.getPort(), properties); + } + for (int i = 0; i < num2; i++) { + setConnection(cluster, TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort(), properties); + } + for (int i = 0; i < num3; i++) { + setConnection(cluster, TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2(), properties); + } + Map cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class, "cachedDnList"); + DataNode dataNode = cachedDnList.getOrDefault(hostSpecs[0], null); + ReflectUtil.setField(DataNode.class, dataNode, "dataNodeState", false); + List result = cluster.sortDnsByLeastConn(Arrays.asList(hostSpecs)); + LOGGER.info(GT.tr("after sort: {0}", result.toString())); + assertEquals(TestUtil.getServer(), result.get(2).getHost()); + assertEquals(TestUtil.getPort(), result.get(2).getPort()); + assertEquals(TestUtil.getSecondaryServer(), result.get(1).getHost()); + assertEquals(TestUtil.getSecondaryPort(), result.get(1).getPort()); + assertEquals(TestUtil.getSecondaryServer2(), result.get(0).getHost()); + assertEquals(TestUtil.getSecondaryServerPort2(), result.get(0).getPort()); + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkDnStateSpecSuccessTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + String URLIdentifier = String.valueOf(hostSpecs); + String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties clusterProperties = new Properties(); + clusterProperties.setProperty("user", TestUtil.getUser()); + clusterProperties.setProperty("password", TestUtil.getPassword()); + clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + clusterProperties.setProperty("PGHOSTURL", pgHostUrl); + clusterProperties.setProperty("PGPORT", pgPortUrl); + clusterProperties.setProperty("PGPORTURL", pgPortUrl); + clusterProperties.setProperty("PGHOST", pgHostUrl); + Cluster cluster = new Cluster(URLIdentifier, clusterProperties); + for (int i = 0; i < DN_NUM; i++) { + assertTrue(cluster.checkDnState(hostSpecs[i])); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkDnStateUserOrPasswordErrorTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + String URLIdentifier = String.valueOf(hostSpecs); + String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties invalidProperties = new Properties(); + invalidProperties.setProperty("user", FAKE_USER); + invalidProperties.setProperty("password", FAKE_PASSWORD); + invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + invalidProperties.setProperty("PGHOSTURL", pgHostUrl); + invalidProperties.setProperty("PGPORT", pgPortUrl); + invalidProperties.setProperty("PGPORTURL", pgPortUrl); + invalidProperties.setProperty("PGHOST", pgHostUrl); + Cluster cluster = new Cluster(URLIdentifier, invalidProperties); + for (int i = 0; i < DN_NUM; i++) { + assertFalse(cluster.checkDnState(hostSpecs[i])); + } + + Properties validProperties = new Properties(); + validProperties.setProperty("user", TestUtil.getUser()); + validProperties.setProperty("password", TestUtil.getPassword()); + validProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + validProperties.setProperty("PGHOSTURL", pgHostUrl); + validProperties.setProperty("PGPORT", pgPortUrl); + validProperties.setProperty("PGPORTURL", pgPortUrl); + validProperties.setProperty("PGHOST", pgHostUrl); + cluster.setProperties(invalidProperties); + cluster.setProperties(validProperties); + for (int i = 0; i < DN_NUM; i++) { + assertTrue(cluster.checkDnState(hostSpecs[i])); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkDnStateConnectFailedTest() throws PSQLException { + HostSpec[] hostSpecs = new HostSpec[DN_NUM]; + for (int i = 0; i < DN_NUM; i++) { + hostSpecs[i] = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT)); + } + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + String URLIdentifier = Arrays.toString(hostSpecs); + String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties invalidProperties = new Properties(); + invalidProperties.setProperty("user", TestUtil.getUser()); + invalidProperties.setProperty("password", TestUtil.getPassword()); + invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + invalidProperties.setProperty("PGHOSTURL", pgHostUrl); + invalidProperties.setProperty("PGPORT", pgPortUrl); + invalidProperties.setProperty("PGPORTURL", pgPortUrl); + invalidProperties.setProperty("PGHOST", pgHostUrl); + Cluster cluster = new Cluster(URLIdentifier, invalidProperties); + for (int i = 0; i < DN_NUM; i++) { + assertFalse(cluster.checkDnState(hostSpecs[i])); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkConnectionStateSuccessTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + int total = 10; + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + int invalidDn = cluster.checkClusterState(); + assertEquals(0, invalidDn); + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkConnectionStateOneNodeFailedTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecsWithInvalidNode(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + int total = 10; + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + int invalidDn = cluster.checkClusterState(); + assertEquals(1, invalidDn); + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkConnectionStateAndQuickLoadBalanceTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + properties.setProperty("enableQuickAutoBalance", "true"); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + int total = 9; + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + try { + Thread.sleep(1100); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + // set dnState='false' of dn1 + + Map cachedDnList = ReflectUtil.getField(Cluster.class, cluster, + Map.class, "cachedDnList"); + DataNode dataNode = cachedDnList.get(hostSpecs[0]); + dataNode.setDataNodeState(false); + int invalidDn = cluster.checkClusterState(); + assertEquals(0, invalidDn); + // check cluster state: jdbc will find dnState change to true from false, and execute quickAutoBalance. + Queue abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster, + Queue.class, "abandonedConnectionList"); + assertEquals(6, abandonedConnectionList.size()); + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkConnectionStateAndQuickLoadBalanceWithParamsTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + // init properties with quickLoadBalance. + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("minReservedConPerDatanode", "0"); + properties.setProperty("minReservedConPerCluster", "75"); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs) + "?autoBalance=leastconn"; + int total = 12; + // set connection + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + try { + Thread.sleep(1100); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + // set dnState='false' of dn1 + Map cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class, + "cachedDnList"); + DataNode dataNode = cachedDnList.get(hostSpecs[0]); + dataNode.setDataNodeState(false); + // check cluster state: jdbc will find dnState change to true from false, and execute quickAutoBalance. + int invalidDn = cluster.checkClusterState(); + assertEquals(0, invalidDn); + Queue abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster, + Queue.class, "abandonedConnectionList"); + assertEquals((int) (total / 3 * 2 * (100 - 75) / 100), abandonedConnectionList.size()); + ConnectionManager.getInstance().clear(); + } + + @Test + public void incrementCachedCreatingConnectionSizeTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + // init properties with quickLoadBalance. + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + Map cachedDnList = ReflectUtil.getField(Cluster.class, cluster, Map.class, + "cachedDnList"); + cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(1, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(2, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + cluster.incrementCachedCreatingConnectionSize(hostSpecs[1]); + assertEquals(1, cachedDnList.get(hostSpecs[1]).getCachedCreatingConnectionSize()); + assertEquals(0, cluster.incrementCachedCreatingConnectionSize(new HostSpec(FAKE_HOST, + Integer.parseInt(FAKE_PORT)))); + ConnectionManager.getInstance().clear(); + } + + @Test + public void decrementCachedCreatingConnectionSizeTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + // init properties with quickLoadBalance. + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + Map cachedDnList = ReflectUtil.getField(Cluster.class, cluster, + Map.class, "cachedDnList"); + cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]); + cluster.incrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(2, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(1, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(0, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + cluster.decrementCachedCreatingConnectionSize(hostSpecs[0]); + assertEquals(0, cachedDnList.get(hostSpecs[0]).getCachedCreatingConnectionSize()); + assertEquals(0, cluster.incrementCachedCreatingConnectionSize(new HostSpec(FAKE_HOST, + Integer.parseInt(FAKE_PORT)))); + ConnectionManager.getInstance().clear(); + } + + @Test + public void setPropertiesTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties1 = initPriority(hostSpecs); + properties1.setProperty("autoBalance", "leastconn"); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties1); + Cluster cluster = new Cluster(URLIdentifier, properties1); + List cachedPropertiesList = ReflectUtil.getField(Cluster.class, cluster, List.class, + "cachedPropertiesList"); + assertEquals(1, cachedPropertiesList.size()); + assertEquals(TestUtil.getUser(), cachedPropertiesList.get(0).getProperty("user", "")); + assertEquals(TestUtil.getPassword(), cachedPropertiesList.get(0).getProperty("password", "")); + Properties properties2 = initPriority(hostSpecs); + properties2.setProperty("user", "fakeuser"); + properties2.setProperty("password", "fakepassword"); + cluster.setProperties(properties2); + assertEquals(2, cachedPropertiesList.size()); + assertEquals("fakeuser", cachedPropertiesList.get(1).getProperty("user", "")); + assertEquals("fakepassword", cachedPropertiesList.get(1).getProperty("password", "")); + Properties properties3 = initPriority(hostSpecs); + properties3.setProperty("password", "fakepassword2"); + cluster.setProperties(properties3); + assertEquals(2, cachedPropertiesList.size()); + assertEquals(TestUtil.getUser(), cachedPropertiesList.get(0).getProperty("user", "")); + assertEquals("fakepassword2", cachedPropertiesList.get(0).getProperty("password", "")); + ConnectionManager.getInstance().clear(); + } + + @Test + public void closeConnectionTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + Arrays.sort(hostSpecs); + String URLIdentifier = QueryCNListUtils.keyFromURL(properties); + Cluster cluster = new Cluster(URLIdentifier, properties); + String url = initURL(hostSpecs); + int total = 50; + List pgConnectionList = new ArrayList<>(); + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(url, properties); + cluster.setConnection(pgConnection, properties); + pgConnectionList.add(pgConnection); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + try { + Thread.sleep(1100); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + Queue abandonedConnectionList = ReflectUtil.getField(Cluster.class, cluster, + Queue.class, "abandonedConnectionList"); + ReflectUtil.setField(Cluster.class, cluster, "totalAbandonedConnectionSize", total); + ReflectUtil.setField(Cluster.class, cluster, "quickAutoBalanceStartTime", Long.MAX_VALUE); + for (PgConnection pgConnection : pgConnectionList) { + ConnectionInfo connectionInfo = cluster.getConnectionInfo(pgConnection); + abandonedConnectionList.add(connectionInfo); + } + assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections()); + assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections()); + assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections()); + assertEquals((int) (Math.ceil((double) total * 0.2)), cluster.closeConnections()); + assertEquals(total - 4 * (int) (Math.ceil((double) total * 0.2)), cluster.closeConnections()); + ConnectionManager.getInstance().clear(); + } +} + diff --git a/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionInfoTest.java b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionInfoTest.java new file mode 100644 index 0000000..2758d9b --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionInfoTest.java @@ -0,0 +1,271 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.test.quickautobalance; + +import org.junit.Test; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.quickautobalance.ConnectionInfo; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * ConnectionInfo Test + */ +public class ConnectionInfoTest { + private HostSpec initHost() { + return new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + } + + private Properties initProperties() { + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + return properties; + } + + @Test + public void createConnectionInfoWithDefaultParamsTest() throws SQLException { + // ConnectionInfo without quickAutoBalance + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertEquals(connectionInfo.getPgConnection(), pgConnection); + assertEquals(connectionInfo.getHostSpec(), hostSpec); + assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IDLE); + assertEquals(connectionInfo.getMaxIdleTimeBeforeTerminal(), 30); + assertEquals(connectionInfo.getAutoBalance(), ""); + assertFalse(connectionInfo.isEnableQuickAutoBalance()); + pgConnection.close(); + } + + @Test(expected = SQLException.class) + public void createConnectionInfoEnableQuickAutoBalanceParsedFailedTest() throws SQLException { + Properties properties = initProperties(); + properties.setProperty("autoBalance", "luanma"); + properties.setProperty("enableQuickAutoBalance", "luanma"); + HostSpec hostSpec = initHost(); + try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class)) { + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + } catch (PSQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test + public void createConnectionInfoEnableQuickAutoBalanceFalseTest() throws SQLException { + Properties properties = initProperties(); + properties.setProperty("autoBalance", "luanma"); + properties.setProperty("enableQuickAutoBalance", "false"); + HostSpec hostSpec = initHost(); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertEquals("luanma", connectionInfo.getAutoBalance()); + assertFalse(connectionInfo.isEnableQuickAutoBalance()); + } + + @Test + public void createConnectionInfoSuccessTest() throws SQLException { + Properties properties = initProperties(); + HostSpec hostSpec = initHost(); + properties.setProperty("maxIdleTimeBeforeTerminal", "66"); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("enableQuickAutoBalance", "true"); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertEquals(connectionInfo.getMaxIdleTimeBeforeTerminal(), 66); + assertTrue(connectionInfo.isEnableQuickAutoBalance()); + assertEquals("leastconn", connectionInfo.getAutoBalance()); + pgConnection.close(); + } + + @Test(expected = PSQLException.class) + public void createConnectionInfoMaxIdleTimeBeforeTerminalParsedFailedTest() throws SQLException { + Properties properties = initProperties(); + properties.setProperty("maxIdleTimeBeforeTerminal", "111111@@"); + HostSpec hostSpec = initHost(); + try { + Connection connection = DriverManager.getConnection(TestUtil.getURL(), properties); + PgConnection pgConnection = connection.unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_TYPE.getState(), e.getSQLState()); + throw e; + } + } + + @Test(expected = SQLException.class) + public void createConnectionInfoMaxIdleTimeBeforeTerminalTooBigTest() throws SQLException { + Properties properties = initProperties(); + HostSpec hostSpec = initHost(); + properties.setProperty("maxIdleTimeBeforeTerminal", String.valueOf(Long.MAX_VALUE)); + properties.setProperty("enableQuickAutoBalance", "false"); + try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class)) { + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test(expected = SQLException.class) + public void createConnectionInfoMaxIdleTimeBeforeTerminalTooSmallTest() throws SQLException { + Properties properties = initProperties(); + HostSpec hostSpec = initHost(); + properties.setProperty("maxIdleTimeBeforeTerminal", String.valueOf(-100)); + properties.setProperty("enableQuickAutoBalance", "false"); + try (PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class)) { + new ConnectionInfo(pgConnection, properties, hostSpec); + } catch (SQLException e) { + e.printStackTrace(); + assertEquals(PSQLState.INVALID_PARAMETER_VALUE.getState(), e.getSQLState()); + throw e; + } + } + + @Test + public void checkConnectionCanBeClosedTestPgConnectionNullTest() throws InterruptedException, PSQLException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + PgConnection pgConnection; + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + Thread.sleep(1100); + ConnectionInfo connectionInfo = new ConnectionInfo(null, properties, hostSpec); + assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis())); + } + + @Test + public void checkConnectionCanBeClosedTestPgConnectionUnableQuickAutoBalance() throws SQLException, + InterruptedException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "false"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + Thread.sleep(1100); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis())); + pgConnection.close(); + } + + @Test + public void checkConnectionCanBeClosedTestStartTimeSmallerThanCreateTimeStamp() throws SQLException, + InterruptedException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + Thread.sleep(1100); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis() - 1000 * 10)); + pgConnection.close(); + } + + @Test + public void checkConnectionCanBeClosedTestConnectionStateNoEqualsToIDLE() throws SQLException, InterruptedException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + connectionInfo.setConnectionState(StatementCancelState.IN_QUERY); + Thread.sleep(1100); + assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis())); + pgConnection.close(); + } + + @Test + public void checkConnectionCanBeCloseTestIDLETimeToShort() throws SQLException, InterruptedException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "10"); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + Thread.sleep(1100); + assertFalse(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis())); + pgConnection.close(); + } + + @Test + public void checkConnectionCanBeCloseTestSuccessTest() throws SQLException, InterruptedException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties).unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + Thread.sleep(1100); + assertTrue(connectionInfo.checkConnectionCanBeClosed(System.currentTimeMillis())); + pgConnection.close(); + } + + @Test + public void checkConnectionIsValidTest() throws SQLException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + assertTrue(connectionInfo.checkConnectionIsValid()); + } + + @Test + public void checkConnectionIsValidFailedTest() throws SQLException { + HostSpec hostSpec = initHost(); + Properties properties = initProperties(); + PgConnection pgConnection = DriverManager.getConnection(TestUtil.getURL(), properties) + .unwrap(PgConnection.class); + ConnectionInfo connectionInfo = new ConnectionInfo(pgConnection, properties, hostSpec); + pgConnection.getQueryExecutor().setAvailability(false); + pgConnection.close(); + long before = System.currentTimeMillis(); + assertFalse(connectionInfo.checkConnectionIsValid()); + long after = System.currentTimeMillis(); + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionManagerTest.java b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionManagerTest.java new file mode 100644 index 0000000..3d43a18 --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/ConnectionManagerTest.java @@ -0,0 +1,320 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.test.quickautobalance; + +import org.junit.Test; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.log.Log; +import org.postgresql.log.Logger; +import org.postgresql.quickautobalance.Cluster; +import org.postgresql.quickautobalance.ConnectionInfo; +import org.postgresql.quickautobalance.ConnectionManager; +import org.postgresql.test.TestUtil; +import org.postgresql.util.GT; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * + */ +public class ConnectionManagerTest { + private static Log LOGGER = Logger.getLogger(ConnectionManagerTest.class.getName()); + + private static final String USER = TestUtil.getUser(); + + private static final String PASSWORD = TestUtil.getPassword(); + + private static final String MASTER_1 = TestUtil.getServer() + ":" + TestUtil.getPort(); + + private static final String SECONDARY_1 = TestUtil.getSecondaryServer() + ":" + TestUtil.getSecondaryPort(); + + private static final String SECONDARY_2 = TestUtil.getSecondaryServer2() + ":" + TestUtil.getSecondaryServerPort2(); + + private static final String DATABASE = TestUtil.getDatabase(); + + private static final int DN_NUM = 3; + + private String initURLWithLeastConn() { + return "jdbc:postgresql://" + MASTER_1 + "," + SECONDARY_1 + + "," + SECONDARY_2 + "/" + DATABASE + "?autoBalance=leastconn&loggerLevel=OFF"; + } + + private PgConnection getConnection(String url, String user, String password) throws SQLException { + return DriverManager.getConnection(url, user, password).unwrap(PgConnection.class); + } + + private HostSpec[] initHostSpec() { + HostSpec[] hostSpecs = new HostSpec[DN_NUM]; + hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort()); + hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2()); + return hostSpecs; + } + + private Properties initPriority(HostSpec[] hostSpecs) { + String host = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String port = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", host); + properties.setProperty("PGPORT", port); + properties.setProperty("PGPORTURL", port); + properties.setProperty("PGHOST", host); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + return properties; + } + + @Test + public void setConnectionTest() throws ClassNotFoundException, SQLException { + if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport")) + || String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) { + return; + } + Class.forName("org.postgresql.Driver"); + String url = initURLWithLeastConn(); + PgConnection connection1 = getConnection(url, USER, PASSWORD); + String urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + Cluster cluster = ConnectionManager.getInstance().getCluster(urlIdentifier); + assertNotEquals(cluster, null); + ConnectionInfo connectionInfo = cluster.getConnectionInfo(connection1); + assertNotEquals(connectionInfo, null); + assertEquals(connectionInfo.getPgConnection(), connection1); + + PgConnection connection2 = getConnection(url, USER, PASSWORD); + urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + cluster = ConnectionManager.getInstance().getCluster(urlIdentifier); + assertNotEquals(cluster, null); + connectionInfo = cluster.getConnectionInfo(connection2); + assertNotEquals(connectionInfo, null); + assertEquals(connectionInfo.getPgConnection(), connection2); + ConnectionManager.getInstance().clear(); + } + + @Test + public void setConnectionStateTest() throws ClassNotFoundException, SQLException { + if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport")) + || String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) { + return; + } + Class.forName("org.postgresql.Driver"); + String url = initURLWithLeastConn(); + // set connection state which exists. + PgConnection connection = getConnection(url, USER, PASSWORD); + ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.IDLE); + String urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + ConnectionInfo connectionInfo = ConnectionManager.getInstance() + .getCluster(urlIdentifier).getConnectionInfo(connection); + assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IDLE); + + ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.IN_QUERY); + urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection); + assertEquals(connectionInfo.getConnectionState(), StatementCancelState.IN_QUERY); + + ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.CANCELING); + urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection); + assertEquals(connectionInfo.getConnectionState(), StatementCancelState.CANCELING); + + ConnectionManager.getInstance().setConnectionState(connection, StatementCancelState.CANCELLED); + urlIdentifier = ConnectionManager.getURLIdentifierFromUrl(url); + connectionInfo = ConnectionManager.getInstance().getCluster(urlIdentifier).getConnectionInfo(connection); + assertEquals(connectionInfo.getConnectionState(), StatementCancelState.CANCELLED); + ConnectionManager.getInstance().clear(); + } + + @Test + public void leastConnTest() throws SQLException, ClassNotFoundException { + if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport")) + || String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) { + return; + } + Class.forName("org.postgresql.Driver"); + String url = initURLWithLeastConn(); + int num = 100; + HashMap dns = new HashMap<>(); + for (int i = 0; i < num; i++) { + PgConnection connection = getConnection(url, USER, PASSWORD); + String socketAddress = connection.getSocketAddress().split("/")[1]; + dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1); + } + List connectionCounts = new ArrayList<>(); + for (Entry entry : dns.entrySet()) { + connectionCounts.add(entry.getValue()); + } + for (Integer connectionCount : connectionCounts) { + LOGGER.info(GT.tr("{0}", connectionCount)); + } + for (int i = 0; i < connectionCounts.size() - 1; i++) { + assertTrue(Math.abs(connectionCounts.get(i) - connectionCounts.get(i + 1)) <= 1); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void leastConnIntoOneDnTest() throws SQLException, ClassNotFoundException { + if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport")) + || String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) { + return; + } + Class.forName("org.postgresql.Driver"); + String url1 = initURLWithLeastConn(); + url1 += "&targetServerType=slave"; + int num1 = 100; + HashMap dns = new HashMap<>(); + for (int i = 0; i < num1; i++) { + PgConnection connection = getConnection(url1, USER, PASSWORD); + String socketAddress = connection.getSocketAddress().split("/")[1]; + dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1); + } + LOGGER.info(GT.tr("create 100 connections to slave nodes.")); + for (Entry entry : dns.entrySet()) { + int num = entry.getValue(); + LOGGER.info(GT.tr("{0} : {1}", entry.getKey(), entry.getValue())); + assertEquals(50, num); + } + + int num2 = 20; + String url2 = initURLWithLeastConn(); + String lastAddress = ""; + for (int i = 0; i < num2; i++) { + PgConnection connection = getConnection(url2, TestUtil.getUser(), TestUtil.getPassword()); + String socketAddress = connection.getSocketAddress().split("/")[1]; + dns.put(socketAddress, dns.getOrDefault(socketAddress, 0) + 1); + if (i > 1) { + assertEquals(lastAddress, socketAddress); + } + lastAddress = socketAddress; + } + LOGGER.info(GT.tr("Create 20 connections to all nodes.")); + for (Entry entry : dns.entrySet()) { + LOGGER.info(GT.tr("{0} : {1}", entry.getKey(), entry.getValue())); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void checkConnectionStateTest() { + // If this testcase fail, maybe heartBeatingThread remove invalid connections. + try { + if (String.valueOf(TestUtil.getSecondaryPort()).equals(System.getProperty("def_pgport")) + || String.valueOf(TestUtil.getSecondaryServerPort2()).equals(System.getProperty("def_pgport"))) { + return; + } + Class.forName("org.postgresql.Driver"); + String url = initURLWithLeastConn(); + List pgConnections = new ArrayList<>(); + int total = 100; + int remove = 10; + for (int i = 0; i < total; i++) { + PgConnection connection = getConnection(url, USER, PASSWORD); + pgConnections.add(connection); + } + for (int i = 0; i < remove; i++) { + pgConnections.get(i).close(); + } + assertEquals(Integer.valueOf(remove), ConnectionManager.getInstance().checkConnectionsValidity().get(0)); + } catch (ClassNotFoundException | SQLException e) { + e.printStackTrace(); + fail(); + } + ConnectionManager.getInstance().clear(); + } + + @Test + public void setClusterTest() throws PSQLException { + HostSpec[] hostSpecs = initHostSpec(); + Properties properties = initPriority(hostSpecs); + assertFalse(ConnectionManager.getInstance().setCluster(properties)); + assertFalse(ConnectionManager.getInstance().setCluster(null)); + + properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + assertFalse(ConnectionManager.getInstance().setCluster(properties)); + + properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + assertTrue(ConnectionManager.getInstance().setCluster(properties)); + assertFalse(ConnectionManager.getInstance().setCluster(properties)); + ConnectionManager.getInstance().clear(); + } + + @Test + public void incrementCachedCreatingConnectionSize() throws PSQLException { + HostSpec[] hostSpecs = initHostSpec(); + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + ConnectionManager.getInstance().setCluster(properties); + assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(1, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], null)); + assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + properties.setProperty("autoBalance", ""); + assertEquals(0, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + properties.setProperty("autoBalance", "leastconn"); + assertEquals(0, ConnectionManager.getInstance() + .incrementCachedCreatingConnectionSize(new HostSpec("127.127.127.127", 93589), properties)); + ConnectionManager.getInstance().clear(); + } + + @Test + public void decrementCachedCreatingConnectionSize() throws PSQLException { + HostSpec[] hostSpecs = initHostSpec(); + Properties properties = initPriority(hostSpecs); + properties.setProperty("autoBalance", "leastconn"); + ConnectionManager.getInstance().setCluster(properties); + assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(1, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], null)); + assertEquals(1, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + assertEquals(2, ConnectionManager.getInstance().incrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + properties.setProperty("autoBalance", ""); + assertEquals(0, ConnectionManager.getInstance().decrementCachedCreatingConnectionSize(hostSpecs[0], properties)); + properties.setProperty("autoBalance", "leastconn"); + assertEquals(0, ConnectionManager.getInstance() + .decrementCachedCreatingConnectionSize(new HostSpec("127.127.127.127", 93589), properties)); + ConnectionManager.getInstance().clear(); + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/DataNodeTest.java b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/DataNodeTest.java new file mode 100644 index 0000000..5ebf83e --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/DataNodeTest.java @@ -0,0 +1,489 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.test.quickautobalance; + +import org.junit.Test; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.StatementCancelState; +import org.postgresql.quickautobalance.ConnectionInfo; +import org.postgresql.quickautobalance.DataNode; +import org.postgresql.quickautobalance.DataNode.CheckDnStateResult; +import org.postgresql.test.TestUtil; +import org.postgresql.util.HostSpec; +import org.postgresql.util.PSQLException; + +import java.lang.reflect.InvocationTargetException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * + */ +public class DataNodeTest { + private static final int DN_NUM = 3; + + private static final String FAKE_HOST = "127.127.217.217"; + + private static final String FAKE_PORT = "1"; + + private static final String FAKE_USER = "fakeuser"; + + private static final String FAKE_DATABASE = "fakedatabase"; + + private static final String FAKE_PASSWORD = "fakepassword"; + + private HostSpec initHost() { + return new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + } + + private PgConnection getConnection(String url, Properties properties) throws SQLException { + Connection connection = DriverManager.getConnection(url, properties); + assertTrue(connection instanceof PgConnection); + return (PgConnection) connection; + } + + private Properties initProperties() { + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + properties.setProperty("autoBalance", "leastconn"); + return properties; + } + + private HostSpec[] initHostSpecs() { + HostSpec[] hostSpecs = new HostSpec[DN_NUM]; + hostSpecs[0] = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + hostSpecs[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort()); + hostSpecs[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2()); + return hostSpecs; + } + + private boolean checkHostSpecs(HostSpec[] hostSpecs) { + if (hostSpecs.length != DN_NUM) { + return false; + } + for (int i = 0; i < DN_NUM; i++) { + if ("localhost".equals(hostSpecs[i].getHost())) { + return false; + } + } + return true; + } + + @Test + public void setConnectionStateTest() throws SQLException { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + Properties properties = initProperties(); + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + + dataNode.setConnectionState(pgConnection, StatementCancelState.IDLE); + assertEquals(StatementCancelState.IDLE, dataNode.getConnectionInfo(pgConnection).getConnectionState()); + + dataNode.setConnectionState(pgConnection, StatementCancelState.IN_QUERY); + assertEquals(StatementCancelState.IN_QUERY, dataNode.getConnectionInfo(pgConnection).getConnectionState()); + + dataNode.setConnectionState(pgConnection, StatementCancelState.CANCELLED); + assertEquals(StatementCancelState.CANCELLED, dataNode.getConnectionInfo(pgConnection).getConnectionState()); + + dataNode.setConnectionState(pgConnection, StatementCancelState.CANCELING); + assertEquals(StatementCancelState.CANCELING, dataNode.getConnectionInfo(pgConnection).getConnectionState()); + } + + @Test + public void setConnectionTest() throws SQLException { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + Properties properties = initProperties(); + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection); + assertNotNull(connectionInfo); + assertEquals(pgConnection, connectionInfo.getPgConnection()); + + dataNode.setConnection(null, properties, hostSpec); + dataNode.setConnection(pgConnection, null, hostSpec); + dataNode.setConnection(pgConnection, properties, null); + } + + @Test + public void getConnectionTest() throws SQLException { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + Properties properties = initProperties(); + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection); + assertNotNull(connectionInfo); + assertEquals(pgConnection, connectionInfo.getPgConnection()); + + connectionInfo = dataNode.getConnectionInfo(null); + assertNull(connectionInfo); + } + + @Test + public void getCachedConnectionListSizeTest() throws SQLException { + int num = 10; + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + Properties properties = initProperties(); + assertEquals(0, dataNode.getCachedConnectionListSize()); + for (int i = 0; i < num; i++) { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + } + int result = dataNode.getCachedConnectionListSize(); + assertEquals(num, result); + } + + @Test + public void checkDnStateWithPropertiesSuccessTest() { + HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + DataNode dataNode = new DataNode(hostSpec); + try { + assertTrue(dataNode.checkDnState(properties)); + } catch (InvocationTargetException | PSQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void checkDnStateWithPropertiesUsernameOrPasswordErrorTest() { + HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", FAKE_USER); + properties.setProperty("password", FAKE_PASSWORD); + DataNode dataNode = new DataNode(hostSpec); + try { + dataNode.checkDnState(properties); + } catch (InvocationTargetException e) { + assertTrue(e.getCause() instanceof PSQLException); + if (e.getCause() instanceof PSQLException) { + PSQLException psqlException = (PSQLException) (e.getCause()); + String sqlState = psqlException.getSQLState(); + assertEquals("28P01", sqlState); + } else { + e.printStackTrace(); + fail(); + } + } catch (PSQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void checkDnStateWithPropertiesDatabaseErrorTest() { + // Invalid parameter "PGDBNAME" doesn't affect to tryConnect(). + HostSpec hostSpec = new HostSpec(TestUtil.getServer(), TestUtil.getPort()); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", FAKE_DATABASE); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("PGPORT", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOST", TestUtil.getServer()); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + DataNode dataNode = new DataNode(hostSpec); + try { + assertTrue(dataNode.checkDnState(properties)); + } catch (InvocationTargetException | PSQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void checkDnStateWithPropertiesConnectionFailedTest() { + HostSpec hostSpec = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT)); + Properties properties = new Properties(); + properties.setProperty("PGDBNAME", TestUtil.getDatabase()); + properties.setProperty("PGHOSTURL", FAKE_HOST); + properties.setProperty("PGPORT", FAKE_PORT); + properties.setProperty("PGPORTURL", FAKE_PORT); + properties.setProperty("PGHOST", FAKE_HOST); + properties.setProperty("user", TestUtil.getUser()); + properties.setProperty("password", TestUtil.getPassword()); + DataNode dataNode = new DataNode(hostSpec); + try { + dataNode.checkDnState(properties); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof PSQLException) { + PSQLException psqlException = (PSQLException) (e.getCause()); + String SQLState = psqlException.getSQLState(); + assertNotEquals("28P01", SQLState); + } else { + assertTrue(true); + } + } catch (PSQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void checkDnStateWithHostSpecSuccessTest() { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties clusterProperties = new Properties(); + clusterProperties.setProperty("user", TestUtil.getUser()); + clusterProperties.setProperty("password", TestUtil.getPassword()); + clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + clusterProperties.setProperty("PGHOSTURL", pgHostUrl); + clusterProperties.setProperty("PGPORT", pgPortUrl); + clusterProperties.setProperty("PGPORTURL", pgPortUrl); + clusterProperties.setProperty("PGHOST", pgHostUrl); + DataNode dataNode = new DataNode(new HostSpec(TestUtil.getServer(), TestUtil.getPort())); + CheckDnStateResult result = dataNode.checkDnStateAndProperties(clusterProperties); + assertEquals(CheckDnStateResult.DN_VALID, result); + } + + @Test + public void checkDnStateWithHostSpecUserOrPasswordExpiredTest() { + HostSpec[] hostSpecs = initHostSpecs(); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + String pgHostUrl = hostSpecs[0].getHost() + "," + hostSpecs[1].getHost() + "," + hostSpecs[2].getHost(); + String pgPortUrl = hostSpecs[0].getPort() + "," + hostSpecs[1].getPort() + "," + hostSpecs[2].getPort(); + Properties invalidProperties = new Properties(); + invalidProperties.setProperty("user", FAKE_USER); + invalidProperties.setProperty("password", FAKE_PASSWORD); + invalidProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + invalidProperties.setProperty("PGHOSTURL", pgHostUrl); + invalidProperties.setProperty("PGPORT", pgPortUrl); + invalidProperties.setProperty("PGPORTURL", pgPortUrl); + invalidProperties.setProperty("PGHOST", pgHostUrl); + DataNode dataNode = new DataNode(new HostSpec(TestUtil.getServer(), TestUtil.getPort())); + CheckDnStateResult result = dataNode.checkDnStateAndProperties(invalidProperties); + assertEquals(CheckDnStateResult.PROPERTIES_INVALID, result); + } + + @Test + public void checkDnStateWithHostSpecConnectFailedTest() { + HostSpec[] hostSpecs = initHostSpecs(); + HostSpec hostSpec = new HostSpec(FAKE_HOST, Integer.parseInt(FAKE_PORT)); + if (!checkHostSpecs(hostSpecs)) { + return; + } + Arrays.sort(hostSpecs); + Properties clusterProperties = new Properties(); + clusterProperties.setProperty("user", TestUtil.getUser()); + clusterProperties.setProperty("password", TestUtil.getPassword()); + clusterProperties.setProperty("PGDBNAME", TestUtil.getDatabase()); + clusterProperties.setProperty("PGHOSTURL", FAKE_HOST); + clusterProperties.setProperty("PGPORT", FAKE_PORT); + clusterProperties.setProperty("PGPORTURL", FAKE_PORT); + clusterProperties.setProperty("PGHOST", FAKE_HOST); + DataNode dataNode = new DataNode(hostSpec); + CheckDnStateResult result = dataNode.checkDnStateAndProperties(clusterProperties); + assertEquals(CheckDnStateResult.DN_INVALID, result); + } + + @Test + public void checkConnectionValidityTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + int total = 10; + int remove = 2; + List pgConnections = new ArrayList<>(); + Properties properties = initProperties(); + try { + for (int i = 0; i < total; i++) { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + pgConnections.add(pgConnection); + } + assertEquals(0, dataNode.checkConnectionsValidity()); + assertEquals(10, dataNode.getCachedConnectionListSize()); + for (int i = 0; i < remove; i++) { + pgConnections.get(i).close(); + } + assertEquals(2, dataNode.checkConnectionsValidity()); + assertEquals(10 - 2, dataNode.getCachedConnectionListSize()); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void filterIdleConnectionsTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + int idleSize = 10; + int querySize = 20; + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + for (int i = 0; i < idleSize; i++) { + try { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + for (int i = 0; i < querySize; i++) { + try { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection); + connectionInfo.setConnectionState(StatementCancelState.IN_QUERY); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + List connectionInfos = dataNode.filterIdleConnections(System.currentTimeMillis()); + assertEquals(idleSize, connectionInfos.size()); + for (ConnectionInfo connectionInfo : connectionInfos) { + assertEquals(StatementCancelState.IDLE, connectionInfo.getConnectionState()); + } + } + + @Test + public void getAndSetDNStateTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + assertTrue(dataNode.getDataNodeState()); + dataNode.setDataNodeState(false); + assertFalse(dataNode.getDataNodeState()); + } + + @Test + public void clearCachedConnectionsTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + int idleSize = 10; + Properties properties = initProperties(); + properties.setProperty("enableQuickAutoBalance", "true"); + properties.setProperty("maxIdleTimeBeforeTerminal", "1"); + for (int i = 0; i < idleSize; i++) { + try { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + ConnectionInfo connectionInfo = dataNode.getConnectionInfo(pgConnection); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + assertEquals(idleSize, dataNode.getCachedConnectionListSize()); + dataNode.clearCachedConnections(); + assertEquals(0, dataNode.getCachedConnectionListSize()); + } + + @Test + public void closeConnectionsTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + int total = 10; + int closed = 5; + Properties properties = initProperties(); + List connectionList = new ArrayList<>(); + for (int i = 0; i < total; i++) { + try { + PgConnection pgConnection = getConnection(TestUtil.getURL(), properties); + dataNode.setConnection(pgConnection, properties, hostSpec); + connectionList.add(pgConnection); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + assertEquals(total, dataNode.getCachedConnectionListSize()); + for (int i = 0; i < total; i++) { + try { + assertTrue(connectionList.get(i).isValid(4)); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + for (int i = 0; i < closed; i++) { + assertTrue(dataNode.closeConnection(connectionList.get(i))); + } + assertEquals(total - closed, dataNode.getCachedConnectionListSize()); + for (int i = 0; i < closed; i++) { + try { + assertFalse(connectionList.get(i).isValid(4)); + } catch (SQLException e) { + e.printStackTrace(); + fail(); + } + } + } + + @Test + public void getAndSetCachedCreatingConnectionSizeTest() { + HostSpec hostSpec = initHost(); + DataNode dataNode = new DataNode(hostSpec); + assertEquals(0, dataNode.getCachedCreatingConnectionSize()); + assertEquals(1, dataNode.incrementCachedCreatingConnectionSize()); + assertEquals(1, dataNode.getCachedCreatingConnectionSize()); + assertEquals(0, dataNode.decrementCachedCreatingConnectionSize()); + assertEquals(0, dataNode.getCachedCreatingConnectionSize()); + assertEquals(0, dataNode.decrementCachedCreatingConnectionSize()); + } +} diff --git a/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/LoadBalanceHeartBeatingTest.java b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/LoadBalanceHeartBeatingTest.java new file mode 100644 index 0000000..33dc5ec --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/quickautobalance/LoadBalanceHeartBeatingTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.postgresql.test.quickautobalance; + +import org.junit.Test; +import org.postgresql.quickautobalance.LoadBalanceHeartBeating; +import org.postgresql.test.TestUtil; + +import java.util.Properties; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * LoadBalanceHeartBeatingTest + */ +public class LoadBalanceHeartBeatingTest { + private Properties initProperties() { + Properties properties = new Properties(); + properties.setProperty("PGPORTURL", TestUtil.getPort() + "," + + TestUtil.getSecondaryPort() + "," + TestUtil.getSecondaryServerPort2()); + properties.setProperty("PGHOSTURL", TestUtil.getServer() + "," + + TestUtil.getSecondaryServer() + "," + TestUtil.getSecondaryServer2()); + return properties; + } + + @Test + public void startCheckConnectionScheduledExecutorServiceSuccessTest() { + Properties properties = initProperties(); + properties.setProperty("autoBalance", "leastconn"); + assertFalse(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.stop(); + } + + @Test + public void startCloseConnectionExecutorServiceSuccessTest() { + Properties properties = initProperties(); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("enableQuickAutoBalance", "true"); + assertFalse(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertTrue(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertTrue(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.stop(); + } + + @Test + public void startCloseConnectionExecutorServiceFailedTest() { + Properties properties = initProperties(); + properties.setProperty("autoBalance", "leastconn"); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + properties.setProperty("enableQuickAutoBalance", "fsfsfs"); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + properties.setProperty("enableQuickAutoBalance", "false"); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertTrue(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.stop(); + } + + @Test + public void startExecutorServiceWithSingleHostTest() { + Properties properties = new Properties(); + properties.setProperty("PGPORTURL", String.valueOf(TestUtil.getPort())); + properties.setProperty("PGHOSTURL", TestUtil.getServer()); + properties.setProperty("autoBalance", "leastconn"); + properties.setProperty("enableQuickAutoBalance", "true"); + assertFalse(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertFalse(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + LoadBalanceHeartBeating.startScheduledExecutorService(properties); + assertFalse(LoadBalanceHeartBeating.isLeastConnStarted()); + assertFalse(LoadBalanceHeartBeating.isQuickAutoBalanceStarted()); + } +}