feat: add statement-level read-write splitting

This commit is contained in:
ZhangCheng
2023-09-22 11:09:21 +08:00
parent 4d31a85a08
commit 0a8165fb6b
13 changed files with 2341 additions and 1 deletions

View File

@ -32,6 +32,7 @@
<jdbc.specification.version.nodot>42</jdbc.specification.version.nodot> <jdbc.specification.version.nodot>42</jdbc.specification.version.nodot>
<skip.assembly>false</skip.assembly> <skip.assembly>false</skip.assembly>
<checkstyle.version>8.5</checkstyle.version> <checkstyle.version>8.5</checkstyle.version>
<shardingsphere.version>5.4.0</shardingsphere.version>
</properties> </properties>
<dependencies> <dependencies>
@ -47,6 +48,18 @@
<version>1.7.30</version> <version>1.7.30</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-parser-sql-engine</artifactId>
<version>${shardingsphere.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-parser-sql-opengauss</artifactId>
<version>${shardingsphere.version}</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -13,6 +13,7 @@ import org.postgresql.log.Log;
import org.postgresql.log.Tracer; import org.postgresql.log.Tracer;
import org.postgresql.quickautobalance.ConnectionManager; import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.LoadBalanceHeartBeating; import org.postgresql.quickautobalance.LoadBalanceHeartBeating;
import org.postgresql.readwritesplitting.ReadWriteSplittingPgConnection;
import org.postgresql.util.DriverInfo; import org.postgresql.util.DriverInfo;
import org.postgresql.util.GT; import org.postgresql.util.GT;
import org.postgresql.util.HostSpec; import org.postgresql.util.HostSpec;
@ -561,6 +562,9 @@ public class Driver implements java.sql.Driver {
*/ */
private static Connection makeConnection(String url, Properties props) throws SQLException { private static Connection makeConnection(String url, Properties props) throws SQLException {
ConnectionManager.getInstance().setCluster(props); ConnectionManager.getInstance().setCluster(props);
if (PGProperty.ENABLE_STATEMENT_LOAD_BALANCE.getBoolean(props)) {
return new ReadWriteSplittingPgConnection(hostSpecs(props), props, user(props), database(props), url);
}
PgConnection pgConnection = new PgConnection(hostSpecs(props), user(props), database(props), props, url); PgConnection pgConnection = new PgConnection(hostSpecs(props), user(props), database(props), props, url);
GlobalConnectionTracker.possessConnectionReference(pgConnection.getQueryExecutor(), props); GlobalConnectionTracker.possessConnectionReference(pgConnection.getQueryExecutor(), props);
LoadBalanceHeartBeating.setConnection(pgConnection, props); LoadBalanceHeartBeating.setConnection(pgConnection, props);

View File

@ -509,6 +509,20 @@ public enum PGProperty {
+ "Value range: int && [0, 100]." + "Value range: int && [0, 100]."
+ "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"), + "This parameter only takes effect when autoBalance=leastconn and enableQuickAutoBalance=true"),
/**
* Enable statement load balance.
*/
ENABLE_STATEMENT_LOAD_BALANCE("enableStatementLoadBalance", "false",
"Enable statement-level load balancing configuration, "
+ "so that load balancing routing will be performed when each SQL statement is executed."
+ "Optional values: true or false.",
false, "true", "false"),
/**
* Write data source address.
*/
WRITE_DATA_SOURCE_ADDRESS("writeDataSourceAddress", "", "Specify the host and port for write database", false),
/** /**
* Supported TLS cipher suites * Supported TLS cipher suites
*/ */

View File

@ -0,0 +1,624 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.jdbc;
import org.postgresql.readwritesplitting.ReadWriteSplittingPgConnection;
import org.postgresql.readwritesplitting.SqlRouteEngine;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
import java.sql.NClob;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
/**
* Read write splitting PG prepared statement
*
* @since 2023-11-20
*/
public class ReadWriteSplittingPgPreparedStatement implements PreparedStatement {
private final PreparedStatement pgPreparedStatement;
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql);
}
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @param autoGeneratedKeys auto generated keys
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql, int autoGeneratedKeys) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql, autoGeneratedKeys);
}
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @param resultSetType result set type
* @param resultSetConcurrency result set concurrency
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql, resultSetType, resultSetConcurrency);
}
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @param columnIndexes column indexes
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql, int[] columnIndexes) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql, columnIndexes);
}
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @param columnNames column names
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql, String[] columnNames) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql, columnNames);
}
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting pg connection
* @param sql SQL
* @param resultSetType result set type
* @param resultSetConcurrency result set concurrency
* @param resultSetHoldability result set holdability
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgPreparedStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
Connection pgConnection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
pgPreparedStatement = pgConnection.prepareStatement(sql, resultSetType, resultSetConcurrency,
resultSetHoldability);
}
@Override
public ResultSet executeQuery() throws SQLException {
return pgPreparedStatement.executeQuery();
}
@Override
public int executeUpdate() throws SQLException {
return pgPreparedStatement.executeUpdate();
}
@Override
public void setNull(int parameterIndex, int sqlType) throws SQLException {
pgPreparedStatement.setNull(parameterIndex, sqlType);
}
@Override
public void setBoolean(int parameterIndex, boolean isTrue) throws SQLException {
pgPreparedStatement.setBoolean(parameterIndex, isTrue);
}
@Override
public void setByte(int parameterIndex, byte x) throws SQLException {
pgPreparedStatement.setByte(parameterIndex, x);
}
@Override
public void setShort(int parameterIndex, short x) throws SQLException {
pgPreparedStatement.setShort(parameterIndex, x);
}
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
pgPreparedStatement.setInt(parameterIndex, x);
}
@Override
public void setLong(int parameterIndex, long x) throws SQLException {
pgPreparedStatement.setLong(parameterIndex, x);
}
@Override
public void setFloat(int parameterIndex, float x) throws SQLException {
pgPreparedStatement.setFloat(parameterIndex, x);
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
pgPreparedStatement.setDouble(parameterIndex, x);
}
@Override
public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
pgPreparedStatement.setBigDecimal(parameterIndex, x);
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
pgPreparedStatement.setString(parameterIndex, x);
}
@Override
public void setBytes(int parameterIndex, byte[] x) throws SQLException {
pgPreparedStatement.setBytes(parameterIndex, x);
}
@Override
public void setDate(int parameterIndex, Date x) throws SQLException {
pgPreparedStatement.setDate(parameterIndex, x);
}
@Override
public void setTime(int parameterIndex, Time x) throws SQLException {
pgPreparedStatement.setTime(parameterIndex, x);
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
pgPreparedStatement.setTimestamp(parameterIndex, x);
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
pgPreparedStatement.setAsciiStream(parameterIndex, x, length);
}
@Override
public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
pgPreparedStatement.setUnicodeStream(parameterIndex, x, length);
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
pgPreparedStatement.setBinaryStream(parameterIndex, x, length);
}
@Override
public void clearParameters() throws SQLException {
pgPreparedStatement.clearParameters();
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
pgPreparedStatement.setObject(parameterIndex, x, targetSqlType);
}
@Override
public void setObject(int parameterIndex, Object x) throws SQLException {
pgPreparedStatement.setObject(parameterIndex, x);
}
@Override
public boolean execute() throws SQLException {
return pgPreparedStatement.execute();
}
@Override
public void addBatch() throws SQLException {
pgPreparedStatement.addBatch();
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
pgPreparedStatement.setCharacterStream(parameterIndex, reader, length);
}
@Override
public void setRef(int parameterIndex, Ref x) throws SQLException {
pgPreparedStatement.setRef(parameterIndex, x);
}
@Override
public void setBlob(int parameterIndex, Blob x) throws SQLException {
pgPreparedStatement.setBlob(parameterIndex, x);
}
@Override
public void setClob(int parameterIndex, Clob x) throws SQLException {
pgPreparedStatement.setClob(parameterIndex, x);
}
@Override
public void setArray(int parameterIndex, Array x) throws SQLException {
pgPreparedStatement.setArray(parameterIndex, x);
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
return pgPreparedStatement.getMetaData();
}
@Override
public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
pgPreparedStatement.setDate(parameterIndex, x, cal);
}
@Override
public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
pgPreparedStatement.setTime(parameterIndex, x, cal);
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
pgPreparedStatement.setTimestamp(parameterIndex, x, cal);
}
@Override
public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
pgPreparedStatement.setNull(parameterIndex, sqlType, typeName);
}
@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
pgPreparedStatement.setURL(parameterIndex, x);
}
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
return pgPreparedStatement.getParameterMetaData();
}
@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
pgPreparedStatement.setRowId(parameterIndex, x);
}
@Override
public void setNString(int parameterIndex, String value) throws SQLException {
pgPreparedStatement.setNString(parameterIndex, value);
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
pgPreparedStatement.setNCharacterStream(parameterIndex, value, length);
}
@Override
public void setNClob(int parameterIndex, NClob value) throws SQLException {
pgPreparedStatement.setNClob(parameterIndex, value);
}
@Override
public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
pgPreparedStatement.setClob(parameterIndex, reader, length);
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
pgPreparedStatement.setBlob(parameterIndex, inputStream, length);
}
@Override
public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
pgPreparedStatement.setNClob(parameterIndex, reader, length);
}
@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
pgPreparedStatement.setSQLXML(parameterIndex, xmlObject);
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
pgPreparedStatement.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
pgPreparedStatement.setAsciiStream(parameterIndex, x, length);
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
pgPreparedStatement.setBinaryStream(parameterIndex, x, length);
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
pgPreparedStatement.setCharacterStream(parameterIndex, reader, length);
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
pgPreparedStatement.setAsciiStream(parameterIndex, x);
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
pgPreparedStatement.setBinaryStream(parameterIndex, x);
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
pgPreparedStatement.setCharacterStream(parameterIndex, reader);
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
pgPreparedStatement.setNCharacterStream(parameterIndex, value);
}
@Override
public void setClob(int parameterIndex, Reader reader) throws SQLException {
pgPreparedStatement.setClob(parameterIndex, reader);
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
pgPreparedStatement.setBlob(parameterIndex, inputStream);
}
@Override
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
pgPreparedStatement.setNClob(parameterIndex, reader);
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return pgPreparedStatement.executeQuery(sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return pgPreparedStatement.executeUpdate(sql);
}
@Override
public void close() throws SQLException {
pgPreparedStatement.close();
}
@Override
public int getMaxFieldSize() throws SQLException {
return pgPreparedStatement.getMaxFieldSize();
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
pgPreparedStatement.setMaxFieldSize(max);
}
@Override
public int getMaxRows() throws SQLException {
return pgPreparedStatement.getMaxRows();
}
@Override
public void setMaxRows(int max) throws SQLException {
pgPreparedStatement.setMaxRows(max);
}
@Override
public void setEscapeProcessing(boolean isEnable) throws SQLException {
pgPreparedStatement.setEscapeProcessing(isEnable);
}
@Override
public int getQueryTimeout() throws SQLException {
return pgPreparedStatement.getQueryTimeout();
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
pgPreparedStatement.setQueryTimeout(seconds);
}
@Override
public void cancel() throws SQLException {
pgPreparedStatement.cancel();
}
@Override
public SQLWarning getWarnings() throws SQLException {
return pgPreparedStatement.getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
pgPreparedStatement.clearWarnings();
}
@Override
public void setCursorName(String name) throws SQLException {
pgPreparedStatement.setCursorName(name);
}
@Override
public boolean execute(String sql) throws SQLException {
return pgPreparedStatement.execute(sql);
}
@Override
public ResultSet getResultSet() throws SQLException {
return pgPreparedStatement.getResultSet();
}
@Override
public int getUpdateCount() throws SQLException {
return pgPreparedStatement.getUpdateCount();
}
@Override
public boolean getMoreResults() throws SQLException {
return pgPreparedStatement.getMoreResults();
}
@Override
public int getFetchDirection() throws SQLException {
return pgPreparedStatement.getFetchDirection();
}
@Override
public void setFetchDirection(int direction) throws SQLException {
pgPreparedStatement.setFetchDirection(direction);
}
@Override
public int getFetchSize() throws SQLException {
return pgPreparedStatement.getFetchSize();
}
@Override
public void setFetchSize(int rows) throws SQLException {
pgPreparedStatement.setFetchSize(rows);
}
@Override
public int getResultSetConcurrency() throws SQLException {
return pgPreparedStatement.getResultSetConcurrency();
}
@Override
public int getResultSetType() throws SQLException {
return pgPreparedStatement.getResultSetType();
}
@Override
public void addBatch(String sql) throws SQLException {
pgPreparedStatement.addBatch(sql);
}
@Override
public void clearBatch() throws SQLException {
pgPreparedStatement.clearBatch();
}
@Override
public int[] executeBatch() throws SQLException {
return pgPreparedStatement.executeBatch();
}
@Override
public Connection getConnection() throws SQLException {
return pgPreparedStatement.getConnection();
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return pgPreparedStatement.getMoreResults(current);
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return pgPreparedStatement.getGeneratedKeys();
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return pgPreparedStatement.executeUpdate(sql, autoGeneratedKeys);
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return pgPreparedStatement.executeUpdate(sql, columnIndexes);
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return pgPreparedStatement.executeUpdate(sql, columnNames);
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return pgPreparedStatement.execute(sql, autoGeneratedKeys);
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return pgPreparedStatement.execute(sql, columnIndexes);
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
return pgPreparedStatement.execute(sql, columnNames);
}
@Override
public int getResultSetHoldability() throws SQLException {
return pgPreparedStatement.getResultSetHoldability();
}
@Override
public boolean isClosed() throws SQLException {
return pgPreparedStatement.isClosed();
}
@Override
public boolean isPoolable() throws SQLException {
return pgPreparedStatement.isPoolable();
}
@Override
public void setPoolable(boolean isPoolable) throws SQLException {
pgPreparedStatement.setPoolable(isPoolable);
}
@Override
public void closeOnCompletion() throws SQLException {
pgPreparedStatement.closeOnCompletion();
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return pgPreparedStatement.isCloseOnCompletion();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return pgPreparedStatement.unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return pgPreparedStatement.isWrapperFor(iface);
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import java.sql.SQLException;
/**
* Force execute callback.
*
* @since 2023-11-20
* @param <T> type of target to be executed
*/
public interface ForceExecuteCallback<T> {
/**
* Execute.
*
* @param target target to be executed
* @throws SQLException SQL exception
*/
void execute(T target) throws SQLException;
}

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
/**
* Force execute template.
*
* @since 2023-11-20
* @param <T> type of targets to be executed
*/
public final class ForceExecuteTemplate<T> {
/**
* Force execute.
*
* @param targets targets to be executed
* @param callback force execute callback
* @throws SQLException throw SQL exception after all targets are executed
*/
public void execute(final Collection<T> targets, final ForceExecuteCallback<T> callback) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (T each : targets) {
try {
callback.execute(each);
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}
private void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
if (exceptions.isEmpty()) {
return;
}
SQLException ex = new SQLException("");
exceptions.forEach(ex::setNextException);
throw ex;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Method invocation recorder.
*
* @since 2023-11-20
* @param <T> type of target
*/
public final class MethodInvocationRecorder<T> {
private final Map<String, ForceExecuteCallback<T>> methodInvocations = new LinkedHashMap<>();
/**
* Record method invocation.
*
* @param methodName method name
* @param callback callback
*/
public void record(final String methodName, final ForceExecuteCallback<T> callback) {
methodInvocations.put(methodName, callback);
}
/**
* Replay methods invocation.
*
* @param target target object
* @throws SQLException SQL Exception
*/
public void replay(final T target) throws SQLException {
for (ForceExecuteCallback<T> each : methodInvocations.values()) {
each.execute(target);
}
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import org.postgresql.hostchooser.HostRequirement;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.util.HostSpec;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
/**
* PG connection manager.
*
* @since 2023-11-20
*/
public class PgConnectionManager implements AutoCloseable {
private final MethodInvocationRecorder<Connection> methodInvocationRecorder = new MethodInvocationRecorder<>();
private final ForceExecuteTemplate<PgConnection> forceExecuteTemplate = new ForceExecuteTemplate<>();
private final Map<String, PgConnection> cachedConnections = new ConcurrentHashMap<>();
private final AtomicReference<PgConnection> currentConnection = new AtomicReference<>();
private final Properties props;
private final String user;
private final String database;
private final String url;
private final ReadWriteSplittingPgConnection readWriteSplittingPgConnection;
/**
* Constructor.
*
* @param props props
* @param user user
* @param database database
* @param url url
* @param connection read write splitting pg connection
*/
public PgConnectionManager(Properties props, String user, String database, String url,
ReadWriteSplittingPgConnection connection) {
this.props = props;
this.user = user;
this.database = database;
this.url = url;
this.readWriteSplittingPgConnection = connection;
}
/**
* Get connection.
*
* @param hostSpec host spec
* @return connection
* @throws SQLException SQL exception
*/
public synchronized PgConnection getConnection(HostSpec hostSpec) throws SQLException {
String cacheKey = getCacheKey(hostSpec);
PgConnection result = cachedConnections.get(cacheKey);
if (result == null) {
result = createConnection(hostSpec, cacheKey);
}
setCurrentConnection(result);
return result;
}
private PgConnection createConnection(HostSpec hostSpec, String cacheKey) throws SQLException {
PgConnection result = new PgConnection(new HostSpec[]{hostSpec}, user, database, props, url);
methodInvocationRecorder.replay(result);
cachedConnections.put(cacheKey, result);
return result;
}
private void setCurrentConnection(PgConnection result) {
currentConnection.set(result);
}
/**
* Get current connection.
*
* @return current connection
* @throws SQLException SQL exception
*/
public PgConnection getCurrentConnection() throws SQLException {
PgConnection result = currentConnection.get();
return result == null ? getConnection(selectCurrentHostSpec()) : result;
}
private HostSpec selectCurrentHostSpec() {
ReadWriteSplittingHostSpec readWriteHostSpec = readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
if (HostRequirement.master == readWriteHostSpec.getTargetServerType()) {
return readWriteHostSpec.getWriteHostSpec();
}
if (HostRequirement.secondary == readWriteHostSpec.getTargetServerType()) {
return readWriteHostSpec.readLoadBalance();
}
return readWriteHostSpec.getWriteHostSpec();
}
private String getCacheKey(HostSpec hostSpec) {
return hostSpec.getHost() + ":" + hostSpec.getPort();
}
@Override
public void close() throws SQLException {
try {
forceExecuteTemplate.execute(cachedConnections.values(), PgConnection::close);
} finally {
cachedConnections.clear();
}
}
/**
* Set auto commit.
*
* @param isAutoCommit auto commit
* @throws SQLException SQL exception
*/
public void setAutoCommit(final boolean isAutoCommit) throws SQLException {
methodInvocationRecorder.record("setAutoCommit", target -> target.setAutoCommit(isAutoCommit));
forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setAutoCommit(isAutoCommit));
}
/**
* Set transaction isolation.
*
* @param level transaction isolation level
* @throws SQLException SQL exception
*/
public void setTransactionIsolation(int level) throws SQLException {
methodInvocationRecorder.record("setTransactionIsolation",
connection -> connection.setTransactionIsolation(level));
forceExecuteTemplate.execute(cachedConnections.values(),
connection -> connection.setTransactionIsolation(level));
}
/**
* Set schema.
*
* @param schema schema
* @throws SQLException SQL exception
*/
public void setSchema(String schema) throws SQLException {
methodInvocationRecorder.record("setSchema", connection -> connection.setSchema(schema));
forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setSchema(schema));
}
/**
* Commit.
*
* @throws SQLException SQL exception
*/
public void commit() throws SQLException {
forceExecuteTemplate.execute(cachedConnections.values(), Connection::commit);
}
/**
* Rollback.
*
* @throws SQLException SQL exception
*/
public void rollback() throws SQLException {
forceExecuteTemplate.execute(cachedConnections.values(), Connection::rollback);
}
/**
* Set read only.
*
* @param isReadOnly read only
* @throws SQLException SQL exception
*/
public void setReadOnly(final boolean isReadOnly) throws SQLException {
methodInvocationRecorder.record("setReadOnly", connection -> connection.setReadOnly(isReadOnly));
forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(isReadOnly));
}
/**
* Whether connection valid.
*
* @param timeout timeout
* @return connection valid or not
* @throws SQLException SQL exception
*/
public boolean isValid(final int timeout) throws SQLException {
for (Connection each : cachedConnections.values()) {
if (!each.isValid(timeout)) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import org.postgresql.hostchooser.HostChooser;
import org.postgresql.hostchooser.HostChooserFactory;
import org.postgresql.hostchooser.HostRequirement;
import org.postgresql.util.HostSpec;
import java.util.Properties;
/**
* Read write splitting host spec.
*
* @since 2023-11-20
*/
public class ReadWriteSplittingHostSpec {
private final HostSpec writeHostSpec;
private final HostSpec[] readHostSpecs;
private final HostRequirement targetServerType;
private final HostChooser readChooser;
/**
* Constructor.
*
* @param writeHostSpec write host spec
* @param hostSpecs host specs
* @param targetServerType target server type
* @param props props
*/
public ReadWriteSplittingHostSpec(HostSpec writeHostSpec, HostSpec[] hostSpecs, HostRequirement targetServerType,
Properties props) {
this.writeHostSpec = writeHostSpec;
this.readHostSpecs = createReadHostSpecs(hostSpecs, writeHostSpec);
this.targetServerType = targetServerType;
readChooser = HostChooserFactory.createHostChooser(readHostSpecs, targetServerType, props);
}
private HostSpec[] createReadHostSpecs(HostSpec[] hostSpecs, HostSpec writeHostSpec) {
int index = 0;
HostSpec[] result = new HostSpec[hostSpecs.length - 1];
for (HostSpec each : hostSpecs) {
if (!each.equals(writeHostSpec)) {
result[index++] = each;
}
}
return result;
}
/**
* Get write host spec.
*
* @return write host spec
*/
public HostSpec getWriteHostSpec() {
return writeHostSpec;
}
/**
* Get read host specs.
*
* @return read host specs
*/
public HostSpec[] getReadHostSpecs() {
return readHostSpecs;
}
/**
* Get target server type.
*
* @return target server type
*/
public HostRequirement getTargetServerType() {
return targetServerType;
}
/**
* Read load balance.
*
* @return routed host spec
*/
public HostSpec readLoadBalance() {
return readChooser.iterator().next().hostSpec;
}
}

View File

@ -0,0 +1,449 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import org.postgresql.PGProperty;
import org.postgresql.core.v3.ConnectionFactoryImpl;
import org.postgresql.hostchooser.HostRequirement;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.ReadWriteSplittingPgPreparedStatement;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.io.IOException;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* Read write splitting pg connection.
*
* @since 2023-11-20
*/
public class ReadWriteSplittingPgConnection implements Connection {
private final ReadWriteSplittingHostSpec readWriteSplittingHostSpec;
private final PgConnectionManager connectionManager;
private final Log LOGGER = Logger.getLogger(ReadWriteSplittingPgConnection.class.getName());
private volatile boolean isClosed;
private boolean isAutoCommit = true;
/**
* Constructor.
*
* @param hostSpecs host specs
* @param props props
* @param user user
* @param database database
* @param url url
* @throws SQLException SQL exception
*/
public ReadWriteSplittingPgConnection(HostSpec[] hostSpecs, Properties props, String user, String database,
String url) throws SQLException {
checkRequiredDependencies();
connectionManager = new PgConnectionManager(props, user, database, url, this);
readWriteSplittingHostSpec = new ReadWriteSplittingHostSpec(getWriteDataSourceAddress(props, hostSpecs),
hostSpecs, getTargetServerTypeParam(props), props);
}
private static void checkRequiredDependencies() throws PSQLException {
if (!isClassPresent("org.apache.shardingsphere.sql.parser.api.SQLParserEngine")) {
throw new PSQLException("When enableStatementLoadBalance=true, the dependency "
+ "shardingsphere-parser-sql-engine does not exist and this function cannot be used.",
PSQLState.UNEXPECTED_ERROR);
}
if (!isClassPresent("org.apache.shardingsphere.sql.parser.opengauss.parser.OpenGaussParserFacade")) {
throw new PSQLException("When enableStatementLoadBalance=true, the dependency "
+ "shardingsphere-parser-sql-opengauss does not exist and this function cannot be used.",
PSQLState.UNEXPECTED_ERROR);
}
}
/**
* Get target server type param.
*
* @param className Class name
* @return Whether class is present
*/
public static boolean isClassPresent(String className) {
try {
Class.forName(className);
return true;
} catch (ClassNotFoundException ignored) {
// Class or one of its dependencies is not present
return false;
}
}
private HostSpec getWriteDataSourceAddress(Properties props, HostSpec[] hostSpecs) throws SQLException {
String writeDataSourceAddress = PGProperty.WRITE_DATA_SOURCE_ADDRESS.get(props);
if (writeDataSourceAddress.trim().isEmpty()) {
return getWriteAddressByEstablishingConnections(hostSpecs);
}
String[] hostSpec = writeDataSourceAddress.split(":");
return new HostSpec(hostSpec[0], Integer.parseInt(hostSpec[1]));
}
private HostSpec getWriteAddressByEstablishingConnections(HostSpec[] hostSpecs) throws SQLException {
for (HostSpec each : hostSpecs) {
PgConnection connection = getConnectionManager().getConnection(each);
ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl();
try {
if (connectionFactory.isMaster(connection.getQueryExecutor())) {
return each;
}
} catch (IOException ex) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Error obtaining node role " + ex.getMessage());
LOGGER.debug(ex.getStackTrace());
}
}
}
throw new PSQLException(GT.tr("No write address found"), PSQLState.CONNECTION_UNABLE_TO_CONNECT);
}
private HostRequirement getTargetServerTypeParam(Properties info) throws PSQLException {
HostRequirement targetServerType;
String targetServerTypeStr = PGProperty.TARGET_SERVER_TYPE.get(info);
try {
targetServerType = HostRequirement.getTargetServerType(targetServerTypeStr);
} catch (IllegalArgumentException ex) {
throw new PSQLException(
GT.tr("Invalid targetServerType value: {0}", targetServerTypeStr),
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
}
return targetServerType;
}
/**
* Get read write splitting host spec.
*
* @return read write splitting host spec
*/
public ReadWriteSplittingHostSpec getReadWriteSplittingHostSpec() {
return readWriteSplittingHostSpec;
}
/**
* Get connection manager.
*
* @return the connectionManager
*/
public PgConnectionManager getConnectionManager() {
return connectionManager;
}
@Override
public Statement createStatement() throws SQLException {
return createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return createStatement(resultSetType, resultSetConcurrency, getHoldability());
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
return new ReadWriteSplittingPgStatement(this, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql, autoGeneratedKeys);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql, resultSetType,
resultSetConcurrency);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql, columnIndexes);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql, columnNames);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
return new ReadWriteSplittingPgPreparedStatement(this, sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return connectionManager.getCurrentConnection().prepareCall(sql);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return connectionManager.getCurrentConnection().prepareCall(sql, resultSetType, resultSetConcurrency);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
return connectionManager.getCurrentConnection().prepareCall(sql, resultSetType, resultSetConcurrency,
resultSetHoldability);
}
@Override
public void setAutoCommit(boolean isAutoCommit) throws SQLException {
this.isAutoCommit = isAutoCommit;
connectionManager.setAutoCommit(isAutoCommit);
}
@Override
public boolean getAutoCommit() throws SQLException {
return isAutoCommit;
}
@Override
public void commit() throws SQLException {
connectionManager.commit();
}
@Override
public void rollback() throws SQLException {
connectionManager.rollback();
}
@Override
public Savepoint setSavepoint() throws SQLException {
return connectionManager.getCurrentConnection().setSavepoint();
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return connectionManager.getCurrentConnection().setSavepoint(name);
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
connectionManager.getCurrentConnection().rollback(savepoint);
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
connectionManager.getCurrentConnection().releaseSavepoint(savepoint);
}
@Override
public void close() throws SQLException {
isClosed = true;
connectionManager.close();
}
@Override
public boolean isClosed() throws SQLException {
return isClosed;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return connectionManager.isValid(timeout);
}
@Override
public void setSchema(String schema) throws SQLException {
connectionManager.setSchema(schema);
}
@Override
public void setReadOnly(boolean isReadOnly) throws SQLException {
connectionManager.setReadOnly(isReadOnly);
}
@Override
public boolean isReadOnly() throws SQLException {
return connectionManager.getCurrentConnection().isReadOnly();
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
connectionManager.setTransactionIsolation(level);
}
@Override
public int getTransactionIsolation() throws SQLException {
return connectionManager.getCurrentConnection().getTransactionIsolation();
}
@Override
public String nativeSQL(String sql) throws SQLException {
return connectionManager.getCurrentConnection().nativeSQL(sql);
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
return connectionManager.getCurrentConnection().getMetaData();
}
@Override
public void setCatalog(String catalog) throws SQLException {
connectionManager.getCurrentConnection().setCatalog(catalog);
}
@Override
public String getCatalog() throws SQLException {
return connectionManager.getCurrentConnection().getCatalog();
}
@Override
public SQLWarning getWarnings() throws SQLException {
return connectionManager.getCurrentConnection().getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
connectionManager.getCurrentConnection().clearWarnings();
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return connectionManager.getCurrentConnection().getTypeMap();
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
connectionManager.getCurrentConnection().setTypeMap(map);
}
@Override
public void setHoldability(int holdability) throws SQLException {
connectionManager.getCurrentConnection().setHoldability(holdability);
}
@Override
public int getHoldability() throws SQLException {
return connectionManager.getCurrentConnection().getHoldability();
}
@Override
public Clob createClob() throws SQLException {
return connectionManager.getCurrentConnection().createClob();
}
@Override
public Blob createBlob() throws SQLException {
return connectionManager.getCurrentConnection().createBlob();
}
@Override
public NClob createNClob() throws SQLException {
return connectionManager.getCurrentConnection().createNClob();
}
@Override
public SQLXML createSQLXML() throws SQLException {
return connectionManager.getCurrentConnection().createSQLXML();
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
try {
connectionManager.getCurrentConnection().setClientInfo(name, value);
} catch (SQLException e) {
throw new SQLClientInfoException(Collections.emptyMap(), e);
}
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
try {
connectionManager.getCurrentConnection().setClientInfo(properties);
} catch (SQLException e) {
throw new SQLClientInfoException(Collections.emptyMap(), e);
}
}
@Override
public String getClientInfo(String name) throws SQLException {
return connectionManager.getCurrentConnection().getClientInfo(name);
}
@Override
public Properties getClientInfo() throws SQLException {
return connectionManager.getCurrentConnection().getClientInfo();
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return connectionManager.getCurrentConnection().createArrayOf(typeName, elements);
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return connectionManager.getCurrentConnection().createStruct(typeName, attributes);
}
@Override
public String getSchema() throws SQLException {
return connectionManager.getCurrentConnection().getSchema();
}
@Override
public void abort(Executor executor) throws SQLException {
connectionManager.getCurrentConnection().abort(executor);
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
connectionManager.getCurrentConnection().setNetworkTimeout(executor, milliseconds);
}
@Override
public int getNetworkTimeout() throws SQLException {
return connectionManager.getCurrentConnection().getNetworkTimeout();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return connectionManager.getCurrentConnection().unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return connectionManager.getCurrentConnection().isWrapperFor(iface);
}
}

View File

@ -0,0 +1,321 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
/**
* Read write splitting statement.
*
* @since 2023-11-20
*/
public class ReadWriteSplittingPgStatement implements Statement {
private final List<Statement> statements = new LinkedList<>();
private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
private final ReadWriteSplittingPgConnection readWriteSplittingPgConnection;
private final Integer resultSetType;
private final Integer resultSetConcurrency;
private final Integer resultSetHoldability;
private Statement currentStatement;
private ResultSet currentResultSet;
private boolean isClosed;
/**
* Constructor.
*
* @param readWriteSplittingPgConnection read write splitting connection
* @param resultSetType result set type
* @param resultSetConcurrency result set concurrency
* @param resultSetHoldability result set holdability
*/
public ReadWriteSplittingPgStatement(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
this.readWriteSplittingPgConnection = readWriteSplittingPgConnection;
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
Statement pgStatement = createPgStatement(sql);
ResultSet result = pgStatement.executeQuery(sql);
currentResultSet = result;
return result;
}
private Statement createPgStatement(String sql) throws SQLException {
Connection connection = SqlRouteEngine.getRoutedConnection(readWriteSplittingPgConnection, sql);
Statement statement = connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
statements.add(statement);
currentStatement = statement;
return statement;
}
/**
* Get current result set.
*
* @return current result set
* @throws SQLException SQL exception
*/
public Statement getCurrentStatement() throws SQLException {
if (currentStatement == null) {
Statement statement =
readWriteSplittingPgConnection.getConnectionManager().getCurrentConnection().createStatement();
statements.add(statement);
currentStatement = statement;
return statement;
} else {
return currentStatement;
}
}
@Override
public boolean execute(String sql) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.execute(sql);
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.execute(sql, autoGeneratedKeys);
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.execute(sql, columnIndexes);
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.execute(sql, columnNames);
}
@Override
public int executeUpdate(String sql) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.executeUpdate(sql);
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.executeUpdate(sql, autoGeneratedKeys);
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.executeUpdate(sql, columnIndexes);
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
Statement pgStatement = createPgStatement(sql);
return pgStatement.executeUpdate(sql, columnNames);
}
public Collection<Statement> getRoutedStatements() {
return statements;
}
@Override
public void close() throws SQLException {
isClosed = true;
try {
forceExecuteTemplate.execute(getRoutedStatements(), Statement::close);
} finally {
getRoutedStatements().clear();
}
}
@Override
public boolean isClosed() throws SQLException {
return isClosed;
}
@Override
public ResultSet getResultSet() throws SQLException {
return currentResultSet;
}
@Override
public int[] executeBatch() throws SQLException {
return getCurrentStatement().executeBatch();
}
@Override
public int getMaxFieldSize() throws SQLException {
return getCurrentStatement().getMaxFieldSize();
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
getCurrentStatement().setMaxFieldSize(max);
}
@Override
public int getMaxRows() throws SQLException {
return getCurrentStatement().getMaxRows();
}
@Override
public void setMaxRows(int max) throws SQLException {
getCurrentStatement().setMaxRows(max);
}
@Override
public void setEscapeProcessing(boolean isEnabled) throws SQLException {
getCurrentStatement().setEscapeProcessing(isEnabled);
}
@Override
public int getQueryTimeout() throws SQLException {
return getCurrentStatement().getQueryTimeout();
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
getCurrentStatement().setQueryTimeout(seconds);
}
@Override
public void cancel() throws SQLException {
forceExecuteTemplate.execute(getRoutedStatements(), Statement::cancel);
}
@Override
public SQLWarning getWarnings() throws SQLException {
return getCurrentStatement().getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
getCurrentStatement().clearWarnings();
}
@Override
public void setCursorName(String name) throws SQLException {
getCurrentStatement().setCursorName(name);
}
@Override
public int getUpdateCount() throws SQLException {
return getCurrentStatement().getUpdateCount();
}
@Override
public boolean getMoreResults() throws SQLException {
return getCurrentStatement().getMoreResults();
}
@Override
public void setFetchDirection(int direction) throws SQLException {
getCurrentStatement().setFetchDirection(direction);
}
@Override
public int getFetchDirection() throws SQLException {
return getCurrentStatement().getFetchDirection();
}
@Override
public void setFetchSize(int rows) throws SQLException {
getCurrentStatement().setFetchSize(rows);
}
@Override
public int getFetchSize() throws SQLException {
return getCurrentStatement().getFetchSize();
}
@Override
public int getResultSetConcurrency() throws SQLException {
return getCurrentStatement().getResultSetConcurrency();
}
@Override
public int getResultSetType() throws SQLException {
return getCurrentStatement().getResultSetType();
}
@Override
public void addBatch(String sql) throws SQLException {
getCurrentStatement().addBatch(sql);
}
@Override
public void clearBatch() throws SQLException {
getCurrentStatement().clearBatch();
}
@Override
public Connection getConnection() throws SQLException {
return readWriteSplittingPgConnection;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return getCurrentStatement().getMoreResults();
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return getCurrentStatement().getGeneratedKeys();
}
@Override
public int getResultSetHoldability() throws SQLException {
return getCurrentStatement().getResultSetHoldability();
}
@Override
public void setPoolable(boolean isPoolable) throws SQLException {
getCurrentStatement().setPoolable(isPoolable);
}
@Override
public boolean isPoolable() throws SQLException {
return getCurrentStatement().isPoolable();
}
@Override
public void closeOnCompletion() throws SQLException {
getCurrentStatement().closeOnCompletion();
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return getCurrentStatement().isCloseOnCompletion();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return getCurrentStatement().unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return getCurrentStatement().isWrapperFor(iface);
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.readwritesplitting;
import org.apache.shardingsphere.sql.parser.api.CacheOption;
import org.apache.shardingsphere.sql.parser.api.SQLParserEngine;
import org.apache.shardingsphere.sql.parser.api.SQLStatementVisitorEngine;
import org.apache.shardingsphere.sql.parser.core.ParseASTNode;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.handler.dml.SelectStatementHandler;
import org.postgresql.hostchooser.HostRequirement;
import org.postgresql.log.Log;
import org.postgresql.log.Logger;
import org.postgresql.util.HostSpec;
import java.sql.Connection;
import java.sql.SQLException;
/**
* SQL route engine.
*
* @since 2023-11-20
*/
public class SqlRouteEngine {
private static final String DATABASE_TYPE = "openGauss";
private static final SQLParserEngine PARSE_ENGINE = new SQLParserEngine(DATABASE_TYPE, new CacheOption(128, 1024L));
private static Log LOGGER = Logger.getLogger(SqlRouteEngine.class.getName());
/**
* Route SQL.
*
* @param readWriteSplittingPgConnection read write splitting PG Connection
* @param sql SQL
* @return routed connection
* @throws SQLException SQL exception
*/
public static Connection getRoutedConnection(ReadWriteSplittingPgConnection readWriteSplittingPgConnection,
String sql) throws SQLException {
HostSpec hostSpec = SqlRouteEngine.route(sql, readWriteSplittingPgConnection);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Routed connection host spec: " + hostSpec);
}
return readWriteSplittingPgConnection.getConnectionManager().getConnection(hostSpec);
}
/**
* Route SQL.
*
* @param sql SQL
* @param readWriteSplittingPgConnection read write splitting PG Connection
* @return host spec
* @throws SQLException sql exception
*/
public static HostSpec route(String sql, ReadWriteSplittingPgConnection readWriteSplittingPgConnection)
throws SQLException {
ReadWriteSplittingHostSpec hostSpec = readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
if (!readWriteSplittingPgConnection.getAutoCommit()) {
return hostSpec.getWriteHostSpec();
}
try {
if (HostRequirement.master == hostSpec.getTargetServerType()) {
return hostSpec.getWriteHostSpec();
}
if (HostRequirement.secondary == hostSpec.getTargetServerType()) {
return hostSpec.readLoadBalance();
}
ParseASTNode parseASTNode = PARSE_ENGINE.parse(sql, true);
SQLStatement sqlStatement = new SQLStatementVisitorEngine(DATABASE_TYPE, false).visit(parseASTNode);
if (isWriteRouteStatement(sqlStatement)) {
return hostSpec.getWriteHostSpec();
}
} catch (final Exception ignored) {
return hostSpec.getWriteHostSpec();
}
return hostSpec.readLoadBalance();
}
private static boolean isWriteRouteStatement(final SQLStatement sqlStatement) {
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement);
}
private static boolean containsLockSegment(final SQLStatement sqlStatement) {
return sqlStatement instanceof SelectStatement
&& SelectStatementHandler.getLockSegment((SelectStatement) sqlStatement).isPresent();
}
}

View File

@ -0,0 +1,423 @@
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
*/
package org.postgresql.test.readwritesplitting;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.postgresql.readwritesplitting.ReadWriteSplittingHostSpec;
import org.postgresql.readwritesplitting.ReadWriteSplittingPgConnection;
import org.postgresql.test.TestUtil;
import org.postgresql.util.HostSpec;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.instanceOf;
/**
* Read write splitting connection test.
*
* @since 2023-11-20
*/
public class ReadWriteSplittingConnectionTest {
private static final int DN_NUM = 3;
private static final String ACCOUNT_TABLE = "account";
private static HostSpec[] hostSpecs;
private static HostSpec writeHostSpec;
private static HostSpec[] readHostSpecs;
private int currentIndex;
private static HostSpec[] initHostSpecs() {
HostSpec[] result = new HostSpec[DN_NUM];
result[0] = getMasterHostSpec();
result[1] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
result[2] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2());
return result;
}
private static HostSpec[] initReadSpecs() {
HostSpec[] result = new HostSpec[DN_NUM - 1];
result[0] = new HostSpec(TestUtil.getSecondaryServer(), TestUtil.getSecondaryPort());
result[1] = new HostSpec(TestUtil.getSecondaryServer2(), TestUtil.getSecondaryServerPort2());
return result;
}
private static HostSpec[] getReadHostSpecs() {
return readHostSpecs;
}
@BeforeClass
public static void setUp() throws Exception {
hostSpecs = initHostSpecs();
readHostSpecs = initReadSpecs();
writeHostSpec = hostSpecs[0];
try (Connection connection = TestUtil.openDB()) {
TestUtil.createTable(connection, ACCOUNT_TABLE, "id int, balance float, transaction_id int");
TestUtil.execute("insert into account values(1, 1, 1)", connection);
}
}
@AfterClass
public static void tearDown() throws Exception {
try (Connection connection = TestUtil.openDB()) {
TestUtil.dropTable(connection, ACCOUNT_TABLE);
}
}
private static HostSpec getMasterHostSpec() {
return new HostSpec(TestUtil.getServer(), TestUtil.getPort());
}
private String initURL(HostSpec[] hostSpecs) {
String host1 = hostSpecs[0].getHost() + ":" + hostSpecs[0].getPort();
String host2 = hostSpecs[1].getHost() + ":" + hostSpecs[1].getPort();
String host3 = hostSpecs[2].getHost() + ":" + hostSpecs[2].getPort();
return "jdbc:postgresql://" + host1 + "," + host2 + "," + host3 + "/" + TestUtil.getDatabase();
}
private Connection getConnection(String urlParams) throws SQLException {
String url = initURL(hostSpecs) + urlParams;
Properties props = getProperties();
return DriverManager.getConnection(url, props);
}
@Test
public void roundRobinLoadBalanceTest() throws SQLException {
String urlParams = String.format("?enableStatementLoadBalance=true&autoBalance=roundrobin"
+ "&writeDataSourceAddress=%s", getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
ReadWriteSplittingHostSpec readWriteSplittingHostSpec =
readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
Assert.assertEquals(readWriteSplittingHostSpec.getWriteHostSpec(), getMasterHostSpec());
Assert.assertEquals(readWriteSplittingHostSpec.getReadHostSpecs(), getReadHostSpecs());
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
HostSpec actual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
for (int i = 0; i < readHostSpecs.length; i++) {
HostSpec firstExpected = getNextExpectedRoundRobinSpec();
if (firstExpected.equals(actual)) {
break;
}
}
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
for (int i = 0; i < 10; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
HostSpec actual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
Assert.assertEquals(getNextExpectedRoundRobinSpec(), actual);
}
}
for (int i = 0; i < 10; i++) {
String sql = "SELECT * FROM account WHERE id = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, "1");
Assert.assertTrue(statement.execute());
HostSpec actual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
Assert.assertEquals(getNextExpectedRoundRobinSpec(), actual);
}
}
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
private static ReadWriteSplittingPgConnection getReadWriteSplittingPgConnection(Connection connection) {
Assert.assertThat(connection, instanceOf(ReadWriteSplittingPgConnection.class));
if (connection instanceof ReadWriteSplittingPgConnection) {
return (ReadWriteSplittingPgConnection) connection;
}
throw new IllegalStateException("Unexpected connection type");
}
@Test
public void shuffleLoadBalanceTest() throws SQLException {
String urlParams = String.format("?enableStatementLoadBalance=true&autoBalance=shuffle&writeDataSourceAddress"
+ "=%s", getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
ReadWriteSplittingHostSpec readWriteSplittingHostSpec =
readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
Assert.assertEquals(readWriteSplittingHostSpec.getWriteHostSpec(), getMasterHostSpec());
Assert.assertEquals(readWriteSplittingHostSpec.getReadHostSpecs(), getReadHostSpecs());
for (int i = 0; i < 10; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 10; i++) {
String sql = "SELECT * FROM account WHERE id = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, "1");
Assert.assertTrue(statement.execute());
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
@Test
public void priorityLoadBalanceTest() throws SQLException {
String urlParams = String.format("?enableStatementLoadBalance=true&autoBalance=priority2"
+ "&writeDataSourceAddress=%s", getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
ReadWriteSplittingHostSpec readWriteSplittingHostSpec =
readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
Assert.assertEquals(readWriteSplittingHostSpec.getWriteHostSpec(), getMasterHostSpec());
Assert.assertEquals(readWriteSplittingHostSpec.getReadHostSpecs(), getReadHostSpecs());
HostSpec firstActual;
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
firstActual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
}
for (int i = 0; i < 10; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
HostSpec actual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
Assert.assertEquals(firstActual, actual);
}
}
for (int i = 0; i < 10; i++) {
String sql = "SELECT * FROM account WHERE id = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, "1");
Assert.assertTrue(statement.execute());
HostSpec actual = getRoutedReadHostSpec(readWriteSplittingPgConnection);
Assert.assertEquals(firstActual, actual);
}
}
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
@Test
public void targetServerTypeOfMasterTest() throws SQLException {
String urlParams = String.format("?enableStatementLoadBalance=true&autoBalance=shuffle&writeDataSourceAddress"
+ "=%s&targetServerType=master", getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
ReadWriteSplittingHostSpec readWriteSplittingHostSpec =
readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
Assert.assertEquals(readWriteSplittingHostSpec.getWriteHostSpec(), getMasterHostSpec());
Assert.assertEquals(readWriteSplittingHostSpec.getReadHostSpecs(), getReadHostSpecs());
for (int i = 0; i < 10; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 10; i++) {
String sql = "SELECT * FROM account WHERE id = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, "1");
Assert.assertTrue(statement.execute());
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
@Test
public void targetServerTypeOfSlaveTest() throws SQLException {
String urlParams = String.format("?enableStatementLoadBalance=true&autoBalance=shuffle&writeDataSourceAddress"
+ "=%s&targetServerType=slave", getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
ReadWriteSplittingHostSpec readWriteSplittingHostSpec =
readWriteSplittingPgConnection.getReadWriteSplittingHostSpec();
Assert.assertEquals(readWriteSplittingHostSpec.getWriteHostSpec(), getMasterHostSpec());
Assert.assertEquals(readWriteSplittingHostSpec.getReadHostSpecs(), getReadHostSpecs());
for (int i = 0; i < 10; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 10; i++) {
String sql = "SELECT * FROM account WHERE id = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, "1");
Assert.assertTrue(statement.execute());
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
try {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
} catch (SQLException e) {
Assert.assertTrue(e.getMessage().contains("ERROR: cannot execute UPDATE in a read-only "
+ "transaction"));
}
}
}
}
}
private static Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("user", TestUtil.getUser());
properties.setProperty("password", TestUtil.getPassword());
return properties;
}
@Test
public void transactionRouteTest() throws SQLException {
String params = "?enableStatementLoadBalance=true&autoBalance=shuffle&writeDataSourceAddress=%s";
String urlParams = String.format(params, getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
connection.setAutoCommit(false);
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
connection.commit();
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
connection.rollback();
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
connection.setAutoCommit(true);
try (Statement statement = connection.createStatement()) {
Assert.assertTrue(statement.execute("SELECT * FROM account WHERE id = 1"));
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
try (Statement statement = connection.createStatement()) {
statement.execute("UPDATE account SET balance = 11 WHERE id = 1");
Assert.assertTrue(isRoutedToWriteHostSpecs(readWriteSplittingPgConnection));
}
}
}
@Test
public void executeMultiQueryByOneStatementTest() throws Exception {
String params = "?enableStatementLoadBalance=true&autoBalance=shuffle&writeDataSourceAddress=%s";
String urlParams = String.format(params, getMasterHostSpec());
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
try (Statement statement = connection.createStatement()) {
for (int i = 0; i < 3; i++) {
ResultSet resultSet = statement.executeQuery("SELECT * FROM account WHERE id = 1");
ResultSet resultSet2 = statement.executeQuery("SELECT * FROM account WHERE id = 1");
ResultSet resultSet3 = statement.executeQuery("SELECT * FROM account WHERE id = 1");
Assert.assertTrue(resultSet.next());
Assert.assertTrue(resultSet2.next());
Assert.assertTrue(resultSet3.next());
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
@Test
public void executeQueryWithoutWriteDataSourceAddressParamTest() throws Exception {
String urlParams = "?enableStatementLoadBalance=true&autoBalance=shuffle";
try (Connection connection = getConnection(urlParams)) {
ReadWriteSplittingPgConnection readWriteSplittingPgConnection =
getReadWriteSplittingPgConnection(connection);
for (int i = 0; i < 3; i++) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("SELECT * FROM account WHERE id = 1");
Assert.assertTrue(resultSet.next());
Assert.assertTrue(isRoutedToReadHostSpecs(readWriteSplittingPgConnection));
}
}
}
}
private HostSpec getNextExpectedRoundRobinSpec() {
return readHostSpecs[(currentIndex++) % readHostSpecs.length];
}
private static boolean isRoutedToReadHostSpecs(ReadWriteSplittingPgConnection readWriteSplittingPgConnection)
throws SQLException {
String socketAddress =
readWriteSplittingPgConnection.getConnectionManager().getCurrentConnection().getSocketAddress();
for (HostSpec readHostSpec : getReadHostSpecs()) {
if (socketAddress.endsWith(getHostOrAlias(readHostSpec) + ":" + readHostSpec.getPort())) {
return true;
}
}
return false;
}
private static HostSpec getRoutedReadHostSpec(ReadWriteSplittingPgConnection readWriteSplittingPgConnection)
throws SQLException {
String socketAddress =
readWriteSplittingPgConnection.getConnectionManager().getCurrentConnection().getSocketAddress();
for (HostSpec readHostSpec : getReadHostSpecs()) {
if (socketAddress.endsWith(getHostOrAlias(readHostSpec) + ":" + readHostSpec.getPort())) {
return readHostSpec;
}
}
throw new IllegalStateException("Must routed to one read host spec");
}
private static boolean isRoutedToWriteHostSpecs(ReadWriteSplittingPgConnection readWriteSplittingPgConnection)
throws SQLException {
String socketAddress =
readWriteSplittingPgConnection.getConnectionManager().getCurrentConnection().getSocketAddress();
return socketAddress.endsWith(getHostOrAlias(writeHostSpec) + ":" + writeHostSpec.getPort());
}
private static String getHostOrAlias(HostSpec readHostSpec) {
String host = readHostSpec.getHost();
return host.equalsIgnoreCase("localhost") ? "127.0.0.1" : host;
}
}