[enhancement](querycancel) print detail message when query is cancelled (#38859)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2024-08-05 14:47:03 +08:00
committed by GitHub
parent 808397e0d2
commit 5dfc5d2c77
7 changed files with 47 additions and 27 deletions

View File

@ -21,6 +21,8 @@ import org.apache.doris.proto.Types.PStatus;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.message.ParameterizedMessage;
public class Status {
public static final Status OK = new Status();
public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled");
@ -43,6 +45,11 @@ public class Status {
this.errorMsg = errorMsg;
}
public Status(TStatusCode code, final String errorMsg, final Object...params) {
this.errorCode = code;
this.errorMsg = ParameterizedMessage.format(errorMsg, params);
}
public Status(final TStatus status) {
this.errorCode = status.status_code;
if (status.isSetErrorMsgs()) {

View File

@ -25,7 +25,7 @@ public interface CoordInterface {
public RowBatch getNext() throws Exception;
public void cancel(Types.PPlanFragmentCancelReason cancelReason);
public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg);
// When call exec or get next data finished, should call this method to release
// some resource.

View File

@ -1443,7 +1443,7 @@ public class Coordinator implements CoordInterface {
// We use a very conservative cancel strategy.
// 0. If backends has zero process epoch, do not cancel. Zero process epoch usually arises in cluster upgrading.
// 1. If process epoch is same, do not cancel. Means backends does not restart or die.
public boolean shouldCancel(List<Backend> currentBackends) {
public Status shouldCancel(List<Backend> currentBackends) {
Map<Long, Backend> curBeMap = Maps.newHashMap();
for (Backend be : currentBackends) {
curBeMap.put(be.getId(), be);
@ -1456,21 +1456,24 @@ public class Coordinator implements CoordInterface {
for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) {
Backend be = curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled",
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Backend {} not exists or dead, query {} should be cancelled",
pipelineExecContext.backend.toString(), DebugUtil.printId(queryId));
return true;
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
}
// Backend process epoch changed, indicates that this be restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog will not persistent be start time
// so newer version follower will get zero epoch when replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Backend process epoch changed, previous {} now {}, "
+ "means this be has already restarted, should cancel this coordinator,"
+ " query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
DebugUtil.printId(queryId));
return true;
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Backend process epoch changed, previous {} now {}, "
+ "means this be has already restarted, should cancel this coordinator,"
+ "query id {}", pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
DebugUtil.printId(queryId));
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?",
be.toString());
@ -1481,23 +1484,27 @@ public class Coordinator implements CoordInterface {
for (BackendExecStates beExecState : beToExecStates.values()) {
Backend be = curBeMap.get(beExecState.beId);
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled.",
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Backend {} not exists or dead, query {} should be cancelled.",
beExecState.beId, DebugUtil.printId(queryId));
return true;
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
}
if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, "
+ "should cancel this coordinator, query id {}",
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Process epoch changed, previous {} now {}, means this be has already restarted,"
+ "should cancel this coordinator, query id {}",
beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId));
return true;
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString());
}
}
}
return false;
return Status.OK;
} finally {
unlock();
}
@ -1507,14 +1514,14 @@ public class Coordinator implements CoordInterface {
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
if (queueToken != null) {
queueToken.signalForCancel();
}
}
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
@ -1527,7 +1534,7 @@ public class Coordinator implements CoordInterface {
DebugUtil.printId(queryId), queryStatus.toString(),
new Exception("cancel failed"));
} else {
queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
}
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
DebugUtil.printId(queryId), cancelReason.toString());

View File

@ -169,7 +169,7 @@ public class PointQueryExec implements CoordInterface {
}
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) {
// Do nothing
}

View File

@ -163,7 +163,7 @@ public class PointQueryExecutor implements CoordInterface {
}
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) {
// Do nothing
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.qe;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
@ -36,10 +37,11 @@ public class QueryCancelWorker extends MasterDaemon {
List<Backend> allBackends = systemInfoService.getAllBackends();
for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) {
if (co.shouldCancel(allBackends)) {
Status status = co.shouldCancel(allBackends);
if (!status.ok()) {
// TODO(zhiqiang): We need more clear cancel message, so that user can figure out what happened
// by searching log.
co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg());
}
}
}

View File

@ -103,6 +103,7 @@ import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.NereidsSqlCacheManager;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.profile.Profile;
@ -1453,7 +1454,7 @@ public class StmtExecutor {
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel(cancelReason);
coordRef.cancel(cancelReason, "");
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
@ -1874,8 +1875,11 @@ public class StmtExecutor {
// notify all be cancel running fragment
// in some case may block all fragment handle threads
// details see issue https://github.com/apache/doris/issues/16203
LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage());
coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR,
"cancel fragment query_id:{} cause {}",
DebugUtil.printId(context.queryId()), e.getMessage());
LOG.warn(internalErrorSt.getErrorMsg());
coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, internalErrorSt.getErrorMsg());
throw e;
} finally {
coordBase.close();
@ -2257,7 +2261,7 @@ public class StmtExecutor {
}
boolean notTimeout = coord.join(execTimeout);
if (!coord.isDone()) {
coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT, "timeout");
if (notTimeout) {
errMsg = coord.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("There exists unhealthy backend. "