diff --git a/pgjdbc/src/main/java/org/postgresql/core/PGStream.java b/pgjdbc/src/main/java/org/postgresql/core/PGStream.java index 21fcad7..a57c13b 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/PGStream.java +++ b/pgjdbc/src/main/java/org/postgresql/core/PGStream.java @@ -228,6 +228,19 @@ public class PGStream implements Closeable, Flushable { pg_output.write(_int4buf); } + /** + * Sends a Long to the back end. + * + * @param val the long to be sent + * @throws IOException if an I/O error occurs + */ + public void sendLong(long val) throws IOException { + int high = (int)(val >> 32); + int low = (int)(val & 0xffffffff); + sendInteger4(high); + sendInteger4(low); + } + /** * Sends a 2-byte integer (short) to the back end. * @@ -327,6 +340,18 @@ public class PGStream implements Closeable, Flushable { | _int4buf[3] & 0xFF; } + /** + * Receives a Long from the backend. + * + * @return the 64bit integer received from the backend + * @throws IOException if an I/O error occurs + */ + public long receiveLong() throws IOException { + long high = receiveInteger4(); + long low = receiveInteger4(); + return (high << 32) | low; + } + /** * Receives a two byte integer from the backend. * diff --git a/pgjdbc/src/main/java/org/postgresql/core/QueryExecutorBase.java b/pgjdbc/src/main/java/org/postgresql/core/QueryExecutorBase.java index 31bff2c..e299914 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/QueryExecutorBase.java +++ b/pgjdbc/src/main/java/org/postgresql/core/QueryExecutorBase.java @@ -90,8 +90,12 @@ public abstract class QueryExecutorBase implements QueryExecutor { }); } + protected abstract void sendSupportTrace() throws IOException; + protected abstract void sendCloseMessage() throws IOException; + protected abstract void sendTrace() throws IOException; + @Override public void setNetworkTimeout(int milliseconds) throws IOException { pgStream.setNetworkTimeout(milliseconds); @@ -146,6 +150,7 @@ public abstract class QueryExecutorBase implements QueryExecutor { try { LOGGER.trace(" FE=> Terminate"); + sendTrace(); sendCloseMessage(); pgStream.flush(); pgStream.close(); diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java index df582b2..012395d 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java @@ -126,6 +126,16 @@ public class QueryExecutorImpl extends QueryExecutorBase { private String clientEncoding; + private long sendTime; + + private long recvTime; + + private long netTime; + + private boolean enableTrace = false; + + private boolean waitNexttime = false; + /** * {@code CommandComplete(B)} messages are quite common, so we reuse instance to parse those */ @@ -157,6 +167,12 @@ public class QueryExecutorImpl extends QueryExecutorBase { private static final String OFF = "off"; + private static final String SERVER_SUPPORT_TRACE = "server_support_trace"; + + private static final int PROTOCOL3_MSGLEN_OFFSET = 4; + + private static final int TIME_NS_ONE_US = 1000; + public QueryExecutorImpl(PGStream pgStream, String user, String database, int cancelSignalTimeout, Properties info) throws SQLException, IOException { super(pgStream, user, database, cancelSignalTimeout, info); @@ -222,6 +238,14 @@ public class QueryExecutorImpl extends QueryExecutorBase { return this.clientEncoding; } + public boolean getWaitNexttime() { + return waitNexttime; + } + + public void setWaitNexttime(boolean waitNexttime) { + this.waitNexttime = waitNexttime; + } + /** * When database compatibility mode is A database and the parameter overload function * is turned on, add out parameter description message. @@ -294,6 +318,14 @@ public class QueryExecutorImpl extends QueryExecutorBase { } } + private void recordSendTime() { + sendTime = System.nanoTime(); + } + + private void recordRecvTime() { + recvTime = System.nanoTime(); + } + /** * @param holder object assumed to hold the lock * @return whether given object actually holds the lock @@ -363,6 +395,10 @@ public class QueryExecutorImpl extends QueryExecutorBase { } } + private boolean isSimpleQuery(int flags) { + return (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0; + } + public synchronized void execute(Query query, ParameterList parameters, ResultHandler handler, int maxRows, int fetchSize, int flags) throws SQLException { waitOnLock(); @@ -388,6 +424,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { boolean autosave = false; try { try { + recordAndSendTrace(flags); handler = sendQueryPreamble(handler, flags); autosave = sendAutomaticSavepoint(query, flags); sendQuery(query, (V3ParameterList) parameters, maxRows, fetchSize, flags, @@ -546,6 +583,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { boolean autosave = false; ResultHandler handler = batchHandler; try { + recordAndSendTrace(flags); handler = sendQueryPreamble(batchHandler, flags); autosave = sendAutomaticSavepoint(queries[0], flags); estimatedReceiveBufferBytes = 0; @@ -619,6 +657,8 @@ public class QueryExecutorImpl extends QueryExecutorBase { boolean autosave = false; ResultHandler handler = batchHandler; try { + // P->DS->S U->E->S + recordAndSendTrace(flags); handler = sendQueryPreamble(batchHandler, flags); autosave = sendAutomaticSavepoint(queries[0], flags); estimatedReceiveBufferBytes = 0; @@ -748,6 +788,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { }; try { + recordAndSendTrace(QueryExecutor.QUERY_NO_METADATA); sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, QueryExecutor.QUERY_NO_METADATA); sendSync(); @@ -1564,6 +1605,28 @@ public class QueryExecutorImpl extends QueryExecutorBase { } } + // + // record sendTime and send K message. + // + private void recordAndSendTrace(int flags) throws IOException { + if (!isSimpleQuery(flags)) { + recordSendTime(); + } + sendTrace(); + } + + // + // send netTime to database + // + protected void sendTrace() throws IOException { + if (!enableTrace) { + return; + } + pgStream.sendChar('K'); // Sync + pgStream.sendInteger4(8 + PROTOCOL3_MSGLEN_OFFSET); // Length + pgStream.sendLong(netTime); + } + // // Message sending // @@ -2142,11 +2205,12 @@ public class QueryExecutorImpl extends QueryExecutorBase { // private void sendOneQuery(SimpleQuery query, SimpleParameterList params, int maxRows, int fetchSize, int flags) throws IOException { - boolean asSimple = (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0; + boolean asSimple = isSimpleQuery(flags); if (asSimple) { assert (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) == 0 : "Simple mode does not support describe requests. sql = " + query.getNativeSql() + ", flags = " + flags; + recordSendTime(); sendSimpleQuery(query, params); return; } @@ -2445,10 +2509,25 @@ public class QueryExecutorImpl extends QueryExecutorBase { // from there. boolean doneAfterRowDescNoData = false; try { + boolean isRecvP = false; while (!endQuery) { c = pgStream.receiveChar(); recievedPacketType=c; switch (c) { + case 'K': //receive dbTime + recordRecvTime(); + int msgLen = pgStream.receiveInteger4(); + // dbTime 8 bytes + assert (msgLen == 8 + PROTOCOL3_MSGLEN_OFFSET); + long dbTime = pgStream.receiveLong(); + if (getWaitNexttime()) { + netTime += ((recvTime - sendTime) / TIME_NS_ONE_US) - dbTime; + } else { + netTime = ((recvTime - sendTime) / TIME_NS_ONE_US) - dbTime; + } + isRecvP = true; + break; + case 'A': // Asynchronous Notify receiveAsyncNotify(); break; @@ -2836,7 +2915,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { throw new IOException("Unexpected packet type: " + c); } } - + enableTrace = isRecvP; } catch(IOException e) { LOGGER.error("IO Exception.recieved packetType:" + recievedPacketType + "last PacketType:" + lastPacketType + "connection info:" + secSocketAddress + "buffer :\n" + pgStream.getInputBufferByHex()); @@ -2877,6 +2956,7 @@ public class QueryExecutorImpl extends QueryExecutorBase { processDeadParsedQueries(); processDeadPortals(); + recordAndSendTrace(0); sendExecute(portal.getQuery(), portal, fetchSize); sendSync(); @@ -3059,6 +3139,12 @@ public class QueryExecutorImpl extends QueryExecutorBase { } } + protected void sendSupportTrace() throws IOException { + pgStream.sendChar('V'); + pgStream.sendInteger4(4); + pgStream.flush(); + } + @Override protected void sendCloseMessage() throws IOException { pgStream.sendChar('X'); @@ -3125,6 +3211,13 @@ public class QueryExecutorImpl extends QueryExecutorBase { String name = pgStream.receiveString(); String value = pgStream.receiveString(); + if (name.equals(SERVER_SUPPORT_TRACE)) { + // SERVER_SUPPORT_TRACE equals "1" means server support trace + if ("1".equals(value)) { + sendSupportTrace(); + } + } + if (name.equals(CLIENT_ENCODING)) { if (allowEncodingChanges) { if (!value.equalsIgnoreCase("UTF8") && !value.equalsIgnoreCase("GBK")) { diff --git a/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java b/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java index 793619f..b0701b2 100644 --- a/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java +++ b/pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java @@ -7,6 +7,7 @@ package org.postgresql.jdbc; import org.postgresql.Driver; import org.postgresql.core.*; +import org.postgresql.core.v3.QueryExecutorImpl; import org.postgresql.quickautobalance.ConnectionManager; import org.postgresql.quickautobalance.LoadBalanceHeartBeating; import org.postgresql.util.GT; @@ -991,6 +992,7 @@ public class PgStatement implements Statement, BaseStatement { BatchResultHandler handler; handler = createBatchHandler(queries, parameterLists); + QueryExecutorImpl executer = (QueryExecutorImpl) connection.getQueryExecutor(); if ((preDescribe || forceBinaryTransfers) && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { // Do a client-server round trip, parsing and describing the query so we @@ -1000,6 +1002,9 @@ public class PgStatement implements Statement, BaseStatement { StatementResultHandler handler2 = new StatementResultHandler(); try { connection.getQueryExecutor().execute(queries[0], parameterLists[0], handler2, 0, 0, flags2); + if (connection.getQueryExecutor() instanceof QueryExecutorImpl) { + executer.setWaitNexttime(true); + } } catch (SQLException e) { // Unable to parse the first statement -> throw BatchUpdateException handler.handleError(e); @@ -1046,6 +1051,7 @@ public class PgStatement implements Statement, BaseStatement { } } } + executer.setWaitNexttime(false); return handler; }