feat: add adaptiveSetSQLType property

Adaptively modify the inconsistent set sqlType in batch mode. If the first set sqlType is INTEGER and the second set is LONG  the first one will be automatically modify to LONG
This commit is contained in:
travelliu
2023-08-18 10:40:56 +08:00
parent d2ee82c3e3
commit 5fcd116227
8 changed files with 110 additions and 8 deletions

View File

@ -546,7 +546,10 @@ public enum PGProperty {
*/
MASTER_FAILURE_HEARTBEAT_TIMEOUT("masterFailureHeartbeatTimeout", "30000", "In the scenario where heartbeat maintenance is enabled for the active node, " +
"if the active node is down, set the timeout threshold for searching for the active node. If the active node is not detected within this timeout period, " +
"the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node.")
"the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node."),
ADAPTIVE_SET_SQL_TYPE("adaptiveSetSQLType","false","Adaptively modify the inconsistent set sqlType in batch mode. If the first set sqlType is INTEGER and the second set is LONG, " +
"the first one will be automatically modify to LONG")
;

View File

@ -236,4 +236,8 @@ public interface BaseConnection extends PGConnection, Connection {
* @return AtomicReferenceFieldUpdater<PgStatement, TimerTask>
*/
AtomicReferenceFieldUpdater<PgStatement, TimerTask> getTimerUpdater();
public boolean IsBatchInsert();
public boolean isAdaptiveSetSQLType();
}

View File

@ -244,4 +244,8 @@ public interface ParameterList {
Object[] getValues();
void bindRegisterOutParameter(int index,int oid, boolean isACompatibilityFunction) throws SQLException;
int getTypeOID(int index);
void setTypeOID(int index, int oid);
}

View File

@ -214,4 +214,12 @@ class CompositeParameterList implements V3ParameterList {
subparams[sub].setBlob(index - offsets[sub], stream, length);
}
public int getTypeOID(int index) {
int[] oids = this.getTypeOIDs();
return oids[index];
}
public void setTypeOID(int index, int oid) {
return;
}
}

View File

@ -365,10 +365,17 @@ class SimpleParameterList implements V3ParameterList {
// Package-private V3 accessors
//
int getTypeOID(int index) {
public int getTypeOID(int index) {
return paramTypes[index - 1];
}
public void setTypeOID(int index, int oid) {
if (index < 1 || index > paramTypes.length) {
return;
}
paramTypes[index - 1] = oid;
}
boolean hasUnresolvedTypes() {
for (int paramType : paramTypes) {
if (paramType == Oid.UNSPECIFIED) {

View File

@ -194,6 +194,7 @@ public class PgConnection implements BaseConnection {
private final String xmlFactoryFactoryClass;
private PGXmlFactoryFactory xmlFactoryFactory;
private String socketAddress;
private boolean adaptiveSetSQLType = false;
final CachedQuery borrowQuery(String sql) throws SQLException {
return queryExecutor.borrowQuery(sql);
}
@ -456,6 +457,8 @@ public class PgConnection implements BaseConnection {
batchInsert = false;
}
adaptiveSetSQLType = PGProperty.ADAPTIVE_SET_SQL_TYPE.getBoolean(info);
initClientLogic(info);
}
@ -2077,5 +2080,13 @@ public class PgConnection implements BaseConnection {
return CANCEL_TIMER_UPDATER;
}
@Override
public boolean IsBatchInsert() {
return this.batchInsert;
}
public boolean isAdaptiveSetSQLType() {
return this.adaptiveSetSQLType;
}
}

View File

@ -63,13 +63,8 @@ import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Locale;
class PgPreparedStatement extends PgStatement implements PreparedStatement {
protected final CachedQuery preparedQuery; // Query fragments for prepared statement.
@ -287,9 +282,34 @@ class PgPreparedStatement extends PgStatement implements PreparedStatement {
preparedParameters.saveLiteralValueForClientLogic(parameterIndex, Long.toString(x));
return;
}
switchOidIntToInt8(parameterIndex);
bindLiteral(parameterIndex, Long.toString(x), Oid.INT8);
}
public boolean isBatchMode() {
if (batchParameters == null) {
return false;
}
if (!connection.isAdaptiveSetSQLType()) {
return false;
}
return connection.IsBatchInsert() && batchParameters.size() >= 1;
}
public void switchOidIntToInt8(int parameterIndex) {
switchOid(parameterIndex, Oid.INT4, Oid.INT8);
}
public void switchOid(int parameterIndex, int oldOid, int newOid) {
if (!isBatchMode()) {
return;
}
ParameterList parameter = batchParameters.get(0);
if (parameter.getTypeOID(parameterIndex) == oldOid && newOid == Oid.INT8) {
parameter.setTypeOID(parameterIndex, newOid);
}
}
public void setFloat(int parameterIndex, float x) throws SQLException {
checkClosed();
if (connection.binaryTransferSend(Oid.FLOAT4)) {

View File

@ -0,0 +1,45 @@
package org.postgresql.test.jdbc2;
import org.junit.After;
import org.junit.Test;
import org.postgresql.test.TestUtil;
import java.sql.*;
import java.util.Properties;
import static org.junit.Assert.fail;
public class AdaptiveSetTypeTest extends BaseTest4{
@Override
public void setUp() throws Exception {
super.setUp();
TestUtil.createTable(con, "test_numeric", "f_member_id character(6) NOT NULL,f_register_capital numeric(18,0)");
}
@After
public void tearDown() throws SQLException {
TestUtil.dropTable(con, "test_numeric");
}
@Override
protected void updateProperties(Properties props) {
props.setProperty("adaptiveSetSQLType","true");
}
@Test
public void AdaptiveSetTypeTrue() throws SQLException {
PreparedStatement ps = null;
Long a = new Long("2180000000");
try {
ps = con.prepareStatement("INSERT INTO test_numeric (F_MEMBER_ID,F_REGISTER_CAPITAL) VALUES ( ?, ?)");
ps.setString(1,"2097 ");
ps.setNull(2, Types.INTEGER);
ps.addBatch();
ps.setString(1,"3020 " );
ps.setLong(2,a);
ps.addBatch();
ps.executeBatch();
} catch (SQLException e) {
fail(e.getMessage());
}finally {
TestUtil.closeQuietly(ps);
}
}
}