[AuditLog] Send queryId to master FE (#5064)
For fix #4977, we return queryId in master FE when finish query for non master to audit it in #4978. But when the query fail(timeout), the client may not receive the right queryId for audit. In this PR: None master FE send queryId to master for querying; Add more log.
This commit is contained in:
@ -47,6 +47,7 @@ import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TMasterOpRequest;
|
||||
import org.apache.doris.thrift.TMasterOpResult;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
@ -59,6 +60,7 @@ import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Process one mysql connection, receive one packet, process, send one packet.
|
||||
@ -461,7 +463,14 @@ public class ConnectProcessor {
|
||||
// 0 for compatibility.
|
||||
int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
|
||||
executor = new StmtExecutor(ctx, new OriginStatement(request.getSql(), idx), true);
|
||||
executor.execute();
|
||||
TUniqueId queryId; // This query id will be set in ctx
|
||||
if (request.isSetQueryId()) {
|
||||
queryId = request.getQueryId();
|
||||
} else {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
}
|
||||
executor.execute(queryId);
|
||||
} catch (IOException e) {
|
||||
// Client failed.
|
||||
LOG.warn("Process one query failed because IOException: ", e);
|
||||
@ -475,8 +484,12 @@ public class ConnectProcessor {
|
||||
// no matter the master execute success or fail, the master must transfer the result to follower
|
||||
// and tell the follower the current journalID.
|
||||
TMasterOpResult result = new TMasterOpResult();
|
||||
if (ctx.queryId() != null) {
|
||||
result.setQueryId(ctx.queryId);
|
||||
if (ctx.queryId() != null &&
|
||||
// If none master FE not set query id or query id was reset in StmtExecutor when a query exec more than once,
|
||||
// return it to none master FE.
|
||||
(!request.isSetQueryId() || !request.getQueryId().equals(ctx.queryId()))
|
||||
) {
|
||||
result.setQueryId(ctx.queryId());
|
||||
}
|
||||
result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue());
|
||||
result.setPacket(getResultPacket());
|
||||
|
||||
@ -95,6 +95,9 @@ public class MasterOpExecutor {
|
||||
queryOptions.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS());
|
||||
queryOptions.setLoadMemLimit(ctx.getSessionVariable().getLoadMemLimit());
|
||||
params.setQueryOptions(queryOptions);
|
||||
if (null != ctx.queryId()) {
|
||||
params.setQueryId(ctx.queryId());
|
||||
}
|
||||
|
||||
LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress);
|
||||
|
||||
@ -110,6 +113,7 @@ public class MasterOpExecutor {
|
||||
if (e.getType() == TTransportException.TIMED_OUT) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.warn("Forward statement "+ ctx.getStmtId() +" to Master " + thriftAddress + " twice", e);
|
||||
result = client.forward(params);
|
||||
isReturnToPool = true;
|
||||
}
|
||||
|
||||
@ -235,17 +235,25 @@ public class StmtExecutor {
|
||||
return parsedStmt;
|
||||
}
|
||||
|
||||
// Execute one statement.
|
||||
// query with a random sql
|
||||
public void execute() throws Exception {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
execute(queryId);
|
||||
}
|
||||
|
||||
// Execute one statement with queryId
|
||||
// The queryId will be set in ConnectContext
|
||||
// This queryId will also be send to master FE for exec master only query.
|
||||
// query id in ConnectContext will be changed when retry exec a query or master FE return a different one.
|
||||
// Exception:
|
||||
// IOException: talk with client failed.
|
||||
public void execute() throws Exception {
|
||||
public void execute(TUniqueId queryId) throws Exception {
|
||||
|
||||
plannerProfile.setQueryBeginTime();
|
||||
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
|
||||
|
||||
// set query id
|
||||
UUID uuid = UUID.randomUUID();
|
||||
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
|
||||
context.setQueryId(queryId);
|
||||
|
||||
try {
|
||||
// support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3);
|
||||
@ -265,6 +273,8 @@ public class StmtExecutor {
|
||||
if (isForwardToMaster()) {
|
||||
forwardToMaster();
|
||||
if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
|
||||
// If the query id changed in master, we set it in context.
|
||||
// WARN: when query timeout, this code may not be reach.
|
||||
context.setQueryId(masterOpExecutor.getQueryId());
|
||||
}
|
||||
return;
|
||||
@ -279,8 +289,10 @@ public class StmtExecutor {
|
||||
try {
|
||||
//reset query id for each retry
|
||||
if (i > 0) {
|
||||
uuid = UUID.randomUUID();
|
||||
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
LOG.warn("Query {} {} times with new query id: {}", DebugUtil.printId(queryId), i, newQueryId);
|
||||
context.setQueryId(newQueryId);
|
||||
}
|
||||
handleQueryStmt();
|
||||
if (context.getSessionVariable().isReportSucc()) {
|
||||
|
||||
Reference in New Issue
Block a user