!266 jdbc支持全链路跟踪

Merge pull request !266 from 蒋宏博/master
This commit is contained in:
opengauss_bot
2024-07-30 13:00:02 +00:00
committed by Gitee
4 changed files with 131 additions and 2 deletions

View File

@ -228,6 +228,19 @@ public class PGStream implements Closeable, Flushable {
pg_output.write(_int4buf);
}
/**
* Sends a Long to the back end.
*
* @param val the long to be sent
* @throws IOException if an I/O error occurs
*/
public void sendLong(long val) throws IOException {
int high = (int)(val >> 32);
int low = (int)(val & 0xffffffff);
sendInteger4(high);
sendInteger4(low);
}
/**
* Sends a 2-byte integer (short) to the back end.
*
@ -327,6 +340,18 @@ public class PGStream implements Closeable, Flushable {
| _int4buf[3] & 0xFF;
}
/**
* Receives a Long from the backend.
*
* @return the 64bit integer received from the backend
* @throws IOException if an I/O error occurs
*/
public long receiveLong() throws IOException {
long high = receiveInteger4();
long low = receiveInteger4();
return (high << 32) | low;
}
/**
* Receives a two byte integer from the backend.
*

View File

@ -90,8 +90,12 @@ public abstract class QueryExecutorBase implements QueryExecutor {
});
}
protected abstract void sendSupportTrace() throws IOException;
protected abstract void sendCloseMessage() throws IOException;
protected abstract void sendTrace() throws IOException;
@Override
public void setNetworkTimeout(int milliseconds) throws IOException {
pgStream.setNetworkTimeout(milliseconds);
@ -146,6 +150,7 @@ public abstract class QueryExecutorBase implements QueryExecutor {
try {
LOGGER.trace(" FE=> Terminate");
sendTrace();
sendCloseMessage();
pgStream.flush();
pgStream.close();

View File

@ -126,6 +126,16 @@ public class QueryExecutorImpl extends QueryExecutorBase {
private String clientEncoding;
private long sendTime;
private long recvTime;
private long netTime;
private boolean enableTrace = false;
private boolean waitNexttime = false;
/**
* {@code CommandComplete(B)} messages are quite common, so we reuse instance to parse those
*/
@ -157,6 +167,12 @@ public class QueryExecutorImpl extends QueryExecutorBase {
private static final String OFF = "off";
private static final String SERVER_SUPPORT_TRACE = "server_support_trace";
private static final int PROTOCOL3_MSGLEN_OFFSET = 4;
private static final int TIME_NS_ONE_US = 1000;
public QueryExecutorImpl(PGStream pgStream, String user, String database,
int cancelSignalTimeout, Properties info) throws SQLException, IOException {
super(pgStream, user, database, cancelSignalTimeout, info);
@ -222,6 +238,14 @@ public class QueryExecutorImpl extends QueryExecutorBase {
return this.clientEncoding;
}
public boolean getWaitNexttime() {
return waitNexttime;
}
public void setWaitNexttime(boolean waitNexttime) {
this.waitNexttime = waitNexttime;
}
/**
* When database compatibility mode is A database and the parameter overload function
* is turned on, add out parameter description message.
@ -294,6 +318,14 @@ public class QueryExecutorImpl extends QueryExecutorBase {
}
}
private void recordSendTime() {
sendTime = System.nanoTime();
}
private void recordRecvTime() {
recvTime = System.nanoTime();
}
/**
* @param holder object assumed to hold the lock
* @return whether given object actually holds the lock
@ -363,6 +395,10 @@ public class QueryExecutorImpl extends QueryExecutorBase {
}
}
private boolean isSimpleQuery(int flags) {
return (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0;
}
public synchronized void execute(Query query, ParameterList parameters, ResultHandler handler,
int maxRows, int fetchSize, int flags) throws SQLException {
waitOnLock();
@ -388,6 +424,7 @@ public class QueryExecutorImpl extends QueryExecutorBase {
boolean autosave = false;
try {
try {
recordAndSendTrace(flags);
handler = sendQueryPreamble(handler, flags);
autosave = sendAutomaticSavepoint(query, flags);
sendQuery(query, (V3ParameterList) parameters, maxRows, fetchSize, flags,
@ -546,6 +583,7 @@ public class QueryExecutorImpl extends QueryExecutorBase {
boolean autosave = false;
ResultHandler handler = batchHandler;
try {
recordAndSendTrace(flags);
handler = sendQueryPreamble(batchHandler, flags);
autosave = sendAutomaticSavepoint(queries[0], flags);
estimatedReceiveBufferBytes = 0;
@ -619,6 +657,8 @@ public class QueryExecutorImpl extends QueryExecutorBase {
boolean autosave = false;
ResultHandler handler = batchHandler;
try {
// P->DS->S U->E->S
recordAndSendTrace(flags);
handler = sendQueryPreamble(batchHandler, flags);
autosave = sendAutomaticSavepoint(queries[0], flags);
estimatedReceiveBufferBytes = 0;
@ -748,6 +788,7 @@ public class QueryExecutorImpl extends QueryExecutorBase {
};
try {
recordAndSendTrace(QueryExecutor.QUERY_NO_METADATA);
sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0,
QueryExecutor.QUERY_NO_METADATA);
sendSync();
@ -1564,6 +1605,28 @@ public class QueryExecutorImpl extends QueryExecutorBase {
}
}
//
// record sendTime and send K message.
//
private void recordAndSendTrace(int flags) throws IOException {
if (!isSimpleQuery(flags)) {
recordSendTime();
}
sendTrace();
}
//
// send netTime to database
//
protected void sendTrace() throws IOException {
if (!enableTrace) {
return;
}
pgStream.sendChar('K'); // Sync
pgStream.sendInteger4(8 + PROTOCOL3_MSGLEN_OFFSET); // Length
pgStream.sendLong(netTime);
}
//
// Message sending
//
@ -2142,11 +2205,12 @@ public class QueryExecutorImpl extends QueryExecutorBase {
//
private void sendOneQuery(SimpleQuery query, SimpleParameterList params, int maxRows,
int fetchSize, int flags) throws IOException {
boolean asSimple = (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0;
boolean asSimple = isSimpleQuery(flags);
if (asSimple) {
assert (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) == 0
: "Simple mode does not support describe requests. sql = " + query.getNativeSql()
+ ", flags = " + flags;
recordSendTime();
sendSimpleQuery(query, params);
return;
}
@ -2445,10 +2509,25 @@ public class QueryExecutorImpl extends QueryExecutorBase {
// from there.
boolean doneAfterRowDescNoData = false;
try {
boolean isRecvP = false;
while (!endQuery) {
c = pgStream.receiveChar();
recievedPacketType=c;
switch (c) {
case 'K': //receive dbTime
recordRecvTime();
int msgLen = pgStream.receiveInteger4();
// dbTime 8 bytes
assert (msgLen == 8 + PROTOCOL3_MSGLEN_OFFSET);
long dbTime = pgStream.receiveLong();
if (getWaitNexttime()) {
netTime += ((recvTime - sendTime) / TIME_NS_ONE_US) - dbTime;
} else {
netTime = ((recvTime - sendTime) / TIME_NS_ONE_US) - dbTime;
}
isRecvP = true;
break;
case 'A': // Asynchronous Notify
receiveAsyncNotify();
break;
@ -2836,7 +2915,7 @@ public class QueryExecutorImpl extends QueryExecutorBase {
throw new IOException("Unexpected packet type: " + c);
}
}
enableTrace = isRecvP;
} catch(IOException e) {
LOGGER.error("IO Exception.recieved packetType:" + recievedPacketType + "last PacketType:" + lastPacketType
+ "connection info:" + secSocketAddress + "buffer :\n" + pgStream.getInputBufferByHex());
@ -2877,6 +2956,7 @@ public class QueryExecutorImpl extends QueryExecutorBase {
processDeadParsedQueries();
processDeadPortals();
recordAndSendTrace(0);
sendExecute(portal.getQuery(), portal, fetchSize);
sendSync();
@ -3059,6 +3139,12 @@ public class QueryExecutorImpl extends QueryExecutorBase {
}
}
protected void sendSupportTrace() throws IOException {
pgStream.sendChar('V');
pgStream.sendInteger4(4);
pgStream.flush();
}
@Override
protected void sendCloseMessage() throws IOException {
pgStream.sendChar('X');
@ -3125,6 +3211,13 @@ public class QueryExecutorImpl extends QueryExecutorBase {
String name = pgStream.receiveString();
String value = pgStream.receiveString();
if (name.equals(SERVER_SUPPORT_TRACE)) {
// SERVER_SUPPORT_TRACE equals "1" means server support trace
if ("1".equals(value)) {
sendSupportTrace();
}
}
if (name.equals(CLIENT_ENCODING)) {
if (allowEncodingChanges) {
if (!value.equalsIgnoreCase("UTF8") && !value.equalsIgnoreCase("GBK")) {

View File

@ -7,6 +7,7 @@ package org.postgresql.jdbc;
import org.postgresql.Driver;
import org.postgresql.core.*;
import org.postgresql.core.v3.QueryExecutorImpl;
import org.postgresql.quickautobalance.ConnectionManager;
import org.postgresql.quickautobalance.LoadBalanceHeartBeating;
import org.postgresql.util.GT;
@ -991,6 +992,7 @@ public class PgStatement implements Statement, BaseStatement {
BatchResultHandler handler;
handler = createBatchHandler(queries, parameterLists);
QueryExecutorImpl executer = (QueryExecutorImpl) connection.getQueryExecutor();
if ((preDescribe || forceBinaryTransfers)
&& (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
// Do a client-server round trip, parsing and describing the query so we
@ -1000,6 +1002,9 @@ public class PgStatement implements Statement, BaseStatement {
StatementResultHandler handler2 = new StatementResultHandler();
try {
connection.getQueryExecutor().execute(queries[0], parameterLists[0], handler2, 0, 0, flags2);
if (connection.getQueryExecutor() instanceof QueryExecutorImpl) {
executer.setWaitNexttime(true);
}
} catch (SQLException e) {
// Unable to parse the first statement -> throw BatchUpdateException
handler.handleError(e);
@ -1046,6 +1051,7 @@ public class PgStatement implements Statement, BaseStatement {
}
}
}
executer.setWaitNexttime(false);
return handler;
}