diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index e987faa008..993de8eb47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -47,6 +47,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; @@ -59,6 +60,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.util.List; +import java.util.UUID; /** * Process one mysql connection, receive one packet, process, send one packet. @@ -461,7 +463,14 @@ public class ConnectProcessor { // 0 for compatibility. int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0; executor = new StmtExecutor(ctx, new OriginStatement(request.getSql(), idx), true); - executor.execute(); + TUniqueId queryId; // This query id will be set in ctx + if (request.isSetQueryId()) { + queryId = request.getQueryId(); + } else { + UUID uuid = UUID.randomUUID(); + queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + } + executor.execute(queryId); } catch (IOException e) { // Client failed. LOG.warn("Process one query failed because IOException: ", e); @@ -475,8 +484,12 @@ public class ConnectProcessor { // no matter the master execute success or fail, the master must transfer the result to follower // and tell the follower the current journalID. TMasterOpResult result = new TMasterOpResult(); - if (ctx.queryId() != null) { - result.setQueryId(ctx.queryId); + if (ctx.queryId() != null && + // If none master FE not set query id or query id was reset in StmtExecutor when a query exec more than once, + // return it to none master FE. + (!request.isSetQueryId() || !request.getQueryId().equals(ctx.queryId())) + ) { + result.setQueryId(ctx.queryId()); } result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue()); result.setPacket(getResultPacket()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 8f0854e922..90a7376e5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -95,6 +95,9 @@ public class MasterOpExecutor { queryOptions.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS()); queryOptions.setLoadMemLimit(ctx.getSessionVariable().getLoadMemLimit()); params.setQueryOptions(queryOptions); + if (null != ctx.queryId()) { + params.setQueryId(ctx.queryId()); + } LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); @@ -110,6 +113,7 @@ public class MasterOpExecutor { if (e.getType() == TTransportException.TIMED_OUT) { throw e; } else { + LOG.warn("Forward statement "+ ctx.getStmtId() +" to Master " + thriftAddress + " twice", e); result = client.forward(params); isReturnToPool = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 349b9a75ac..c894048a6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -235,17 +235,25 @@ public class StmtExecutor { return parsedStmt; } - // Execute one statement. + // query with a random sql + public void execute() throws Exception { + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + execute(queryId); + } + + // Execute one statement with queryId + // The queryId will be set in ConnectContext + // This queryId will also be send to master FE for exec master only query. + // query id in ConnectContext will be changed when retry exec a query or master FE return a different one. // Exception: // IOException: talk with client failed. - public void execute() throws Exception { + public void execute(TUniqueId queryId) throws Exception { plannerProfile.setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); - // set query id - UUID uuid = UUID.randomUUID(); - context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + context.setQueryId(queryId); try { // support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3); @@ -265,6 +273,8 @@ public class StmtExecutor { if (isForwardToMaster()) { forwardToMaster(); if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) { + // If the query id changed in master, we set it in context. + // WARN: when query timeout, this code may not be reach. context.setQueryId(masterOpExecutor.getQueryId()); } return; @@ -279,8 +289,10 @@ public class StmtExecutor { try { //reset query id for each retry if (i > 0) { - uuid = UUID.randomUUID(); - context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + UUID uuid = UUID.randomUUID(); + TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + LOG.warn("Query {} {} times with new query id: {}", DebugUtil.printId(queryId), i, newQueryId); + context.setQueryId(newQueryId); } handleQueryStmt(); if (context.getSessionVariable().isReportSucc()) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0b3ea12a66..445d0279ee 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -442,6 +442,7 @@ struct TMasterOpRequest { 14: optional Types.TUserIdentity current_user_ident 15: optional i32 stmtIdx // the idx of the sql in multi statements 16: optional PaloInternalService.TQueryOptions query_options + 17: optional Types.TUniqueId query_id // when this is a query, we translate this query id to master } struct TColumnDefinition {