!148 增加adaptiveSetSQLType控制参数支持JDBC batch模式下OID刷新
Merge pull request !148 from travelliu/kettle
This commit is contained in:
@ -551,7 +551,10 @@ public enum PGProperty {
|
|||||||
*/
|
*/
|
||||||
MASTER_FAILURE_HEARTBEAT_TIMEOUT("masterFailureHeartbeatTimeout", "30000", "In the scenario where heartbeat maintenance is enabled for the active node, " +
|
MASTER_FAILURE_HEARTBEAT_TIMEOUT("masterFailureHeartbeatTimeout", "30000", "In the scenario where heartbeat maintenance is enabled for the active node, " +
|
||||||
"if the active node is down, set the timeout threshold for searching for the active node. If the active node is not detected within this timeout period, " +
|
"if the active node is down, set the timeout threshold for searching for the active node. If the active node is not detected within this timeout period, " +
|
||||||
"the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node.")
|
"the cluster is considered to have no active node and no maintenance is performed on the current cluster. This time should include the RTO time of the active node."),
|
||||||
|
|
||||||
|
ADAPTIVE_SET_SQL_TYPE("adaptiveSetSQLType","false","Adaptively modify the inconsistent set sqlType in batch mode. If the first set sqlType is INTEGER and the second set is LONG, " +
|
||||||
|
"the first one will be automatically modify to LONG")
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
|
|||||||
@ -236,4 +236,8 @@ public interface BaseConnection extends PGConnection, Connection {
|
|||||||
* @return AtomicReferenceFieldUpdater<PgStatement, TimerTask>
|
* @return AtomicReferenceFieldUpdater<PgStatement, TimerTask>
|
||||||
*/
|
*/
|
||||||
AtomicReferenceFieldUpdater<PgStatement, TimerTask> getTimerUpdater();
|
AtomicReferenceFieldUpdater<PgStatement, TimerTask> getTimerUpdater();
|
||||||
|
|
||||||
|
public boolean IsBatchInsert();
|
||||||
|
|
||||||
|
public boolean isAdaptiveSetSQLType();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -244,4 +244,8 @@ public interface ParameterList {
|
|||||||
Object[] getValues();
|
Object[] getValues();
|
||||||
|
|
||||||
void bindRegisterOutParameter(int index,int oid, boolean isACompatibilityFunction) throws SQLException;
|
void bindRegisterOutParameter(int index,int oid, boolean isACompatibilityFunction) throws SQLException;
|
||||||
|
|
||||||
|
int getTypeOID(int index);
|
||||||
|
|
||||||
|
void setTypeOID(int index, int oid);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -214,4 +214,12 @@ class CompositeParameterList implements V3ParameterList {
|
|||||||
subparams[sub].setBlob(index - offsets[sub], stream, length);
|
subparams[sub].setBlob(index - offsets[sub], stream, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getTypeOID(int index) {
|
||||||
|
int[] oids = this.getTypeOIDs();
|
||||||
|
return oids[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTypeOID(int index, int oid) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -365,10 +365,17 @@ class SimpleParameterList implements V3ParameterList {
|
|||||||
// Package-private V3 accessors
|
// Package-private V3 accessors
|
||||||
//
|
//
|
||||||
|
|
||||||
int getTypeOID(int index) {
|
public int getTypeOID(int index) {
|
||||||
return paramTypes[index - 1];
|
return paramTypes[index - 1];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setTypeOID(int index, int oid) {
|
||||||
|
if (index < 1 || index > paramTypes.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
paramTypes[index - 1] = oid;
|
||||||
|
}
|
||||||
|
|
||||||
boolean hasUnresolvedTypes() {
|
boolean hasUnresolvedTypes() {
|
||||||
for (int paramType : paramTypes) {
|
for (int paramType : paramTypes) {
|
||||||
if (paramType == Oid.UNSPECIFIED) {
|
if (paramType == Oid.UNSPECIFIED) {
|
||||||
|
|||||||
@ -194,6 +194,7 @@ public class PgConnection implements BaseConnection {
|
|||||||
private final String xmlFactoryFactoryClass;
|
private final String xmlFactoryFactoryClass;
|
||||||
private PGXmlFactoryFactory xmlFactoryFactory;
|
private PGXmlFactoryFactory xmlFactoryFactory;
|
||||||
private String socketAddress;
|
private String socketAddress;
|
||||||
|
private boolean adaptiveSetSQLType = false;
|
||||||
final CachedQuery borrowQuery(String sql) throws SQLException {
|
final CachedQuery borrowQuery(String sql) throws SQLException {
|
||||||
return queryExecutor.borrowQuery(sql);
|
return queryExecutor.borrowQuery(sql);
|
||||||
}
|
}
|
||||||
@ -456,6 +457,8 @@ public class PgConnection implements BaseConnection {
|
|||||||
LOGGER.trace("WARNING, unrecognized batchmode type");
|
LOGGER.trace("WARNING, unrecognized batchmode type");
|
||||||
batchInsert = false;
|
batchInsert = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
adaptiveSetSQLType = PGProperty.ADAPTIVE_SET_SQL_TYPE.getBoolean(info);
|
||||||
|
|
||||||
initClientLogic(info);
|
initClientLogic(info);
|
||||||
}
|
}
|
||||||
@ -2078,5 +2081,13 @@ public class PgConnection implements BaseConnection {
|
|||||||
return CANCEL_TIMER_UPDATER;
|
return CANCEL_TIMER_UPDATER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean IsBatchInsert() {
|
||||||
|
return this.batchInsert;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAdaptiveSetSQLType() {
|
||||||
|
return this.adaptiveSetSQLType;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -63,13 +63,8 @@ import java.time.LocalDate;
|
|||||||
import java.time.LocalTime;
|
import java.time.LocalTime;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Calendar;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.TimeZone;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.Locale;
|
|
||||||
|
|
||||||
class PgPreparedStatement extends PgStatement implements PreparedStatement {
|
class PgPreparedStatement extends PgStatement implements PreparedStatement {
|
||||||
protected final CachedQuery preparedQuery; // Query fragments for prepared statement.
|
protected final CachedQuery preparedQuery; // Query fragments for prepared statement.
|
||||||
@ -287,9 +282,34 @@ class PgPreparedStatement extends PgStatement implements PreparedStatement {
|
|||||||
preparedParameters.saveLiteralValueForClientLogic(parameterIndex, Long.toString(x));
|
preparedParameters.saveLiteralValueForClientLogic(parameterIndex, Long.toString(x));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
switchOidIntToInt8(parameterIndex);
|
||||||
bindLiteral(parameterIndex, Long.toString(x), Oid.INT8);
|
bindLiteral(parameterIndex, Long.toString(x), Oid.INT8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isBatchMode() {
|
||||||
|
if (batchParameters == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!connection.isAdaptiveSetSQLType()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return connection.IsBatchInsert() && batchParameters.size() >= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void switchOidIntToInt8(int parameterIndex) {
|
||||||
|
switchOid(parameterIndex, Oid.INT4, Oid.INT8);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void switchOid(int parameterIndex, int oldOid, int newOid) {
|
||||||
|
if (!isBatchMode()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ParameterList parameter = batchParameters.get(0);
|
||||||
|
if (parameter.getTypeOID(parameterIndex) == oldOid && newOid == Oid.INT8) {
|
||||||
|
parameter.setTypeOID(parameterIndex, newOid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void setFloat(int parameterIndex, float x) throws SQLException {
|
public void setFloat(int parameterIndex, float x) throws SQLException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
if (connection.binaryTransferSend(Oid.FLOAT4)) {
|
if (connection.binaryTransferSend(Oid.FLOAT4)) {
|
||||||
|
|||||||
@ -0,0 +1,45 @@
|
|||||||
|
package org.postgresql.test.jdbc2;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.postgresql.test.TestUtil;
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
public class AdaptiveSetTypeTest extends BaseTest4{
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
TestUtil.createTable(con, "test_numeric", "f_member_id character(6) NOT NULL,f_register_capital numeric(18,0)");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws SQLException {
|
||||||
|
TestUtil.dropTable(con, "test_numeric");
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected void updateProperties(Properties props) {
|
||||||
|
props.setProperty("adaptiveSetSQLType","true");
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void AdaptiveSetTypeTrue() throws SQLException {
|
||||||
|
PreparedStatement ps = null;
|
||||||
|
Long a = new Long("2180000000");
|
||||||
|
try {
|
||||||
|
ps = con.prepareStatement("INSERT INTO test_numeric (F_MEMBER_ID,F_REGISTER_CAPITAL) VALUES ( ?, ?)");
|
||||||
|
ps.setString(1,"2097 ");
|
||||||
|
ps.setNull(2, Types.INTEGER);
|
||||||
|
ps.addBatch();
|
||||||
|
ps.setString(1,"3020 " );
|
||||||
|
ps.setLong(2,a);
|
||||||
|
ps.addBatch();
|
||||||
|
ps.executeBatch();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}finally {
|
||||||
|
TestUtil.closeQuietly(ps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user