[opt](cancel) Cancel get result future immediately if query is cancelled (#31228)
This commit is contained in:
@ -24,6 +24,7 @@ import org.apache.doris.thrift.TStatusCode;
|
||||
public class Status {
|
||||
public static final Status OK = new Status();
|
||||
public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled");
|
||||
public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, "Timeout");
|
||||
|
||||
public TStatusCode getErrorCode() {
|
||||
return errorCode;
|
||||
|
||||
@ -1327,7 +1327,8 @@ public class Coordinator implements CoordInterface {
|
||||
Status status = new Status();
|
||||
resultBatch = receiver.getNext(status);
|
||||
if (!status.ok()) {
|
||||
LOG.warn("get next fail, need cancel. query id: {}", DebugUtil.printId(queryId));
|
||||
LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
|
||||
DebugUtil.printId(queryId), status.toString());
|
||||
}
|
||||
|
||||
updateStatus(status, null /* no instance id */);
|
||||
@ -1475,7 +1476,7 @@ public class Coordinator implements CoordInterface {
|
||||
|
||||
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
|
||||
if (null != receiver) {
|
||||
receiver.cancel();
|
||||
receiver.cancel(cancelReason.toString());
|
||||
}
|
||||
if (null != pointExec) {
|
||||
pointExec.cancel();
|
||||
@ -1487,7 +1488,7 @@ public class Coordinator implements CoordInterface {
|
||||
|
||||
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) {
|
||||
if (null != receiver) {
|
||||
receiver.cancel();
|
||||
receiver.cancel(cancelReason.toString());
|
||||
}
|
||||
if (null != pointExec) {
|
||||
pointExec.cancel();
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -49,6 +50,8 @@ public class ResultReceiver {
|
||||
private Types.PUniqueId finstId;
|
||||
private Long backendId;
|
||||
private Thread currentThread;
|
||||
private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null;
|
||||
public String cancelReason = "";
|
||||
|
||||
public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) {
|
||||
this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
|
||||
@ -71,25 +74,43 @@ public class ResultReceiver {
|
||||
.build();
|
||||
|
||||
currentThread = Thread.currentThread();
|
||||
Future<InternalService.PFetchDataResult> future
|
||||
fetchDataAsyncFuture
|
||||
= BackendServiceProxy.getInstance().fetchDataAsync(address, request);
|
||||
InternalService.PFetchDataResult pResult = null;
|
||||
|
||||
while (pResult == null) {
|
||||
long currentTs = System.currentTimeMillis();
|
||||
if (currentTs >= timeoutTs) {
|
||||
throw new TimeoutException("query timeout, query id = " + DebugUtil.printId(this.queryId));
|
||||
}
|
||||
try {
|
||||
pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
|
||||
pResult = fetchDataAsyncFuture.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
|
||||
} catch (CancellationException e) {
|
||||
LOG.warn("Future of ResultReceiver of query {} is cancelled", DebugUtil.printId(this.queryId));
|
||||
if (!isCancel) {
|
||||
LOG.warn("ResultReceiver is not set to cancelled state, this should not happen");
|
||||
} else {
|
||||
status.setStatus(new Status(TStatusCode.CANCELLED, this.cancelReason));
|
||||
return null;
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("Query {} get result timeout, get result duration {} ms",
|
||||
DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000);
|
||||
isCancel = true;
|
||||
status.setStatus(Status.TIMEOUT);
|
||||
updateCancelReason("fetch data timeout");
|
||||
return null;
|
||||
} catch (InterruptedException e) {
|
||||
// continue to get result
|
||||
LOG.info("future get interrupted Exception", e);
|
||||
LOG.warn("Future of ResultReceiver of query {} got interrupted Exception",
|
||||
DebugUtil.printId(this.queryId), e);
|
||||
if (isCancel) {
|
||||
status.setStatus(Status.CANCELLED);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
status.setPstatus(pResult.getStatus());
|
||||
@ -150,8 +171,18 @@ public class ResultReceiver {
|
||||
return rowBatch;
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
private void updateCancelReason(String reason) {
|
||||
if (this.cancelReason.isEmpty()) {
|
||||
this.cancelReason = reason;
|
||||
} else {
|
||||
LOG.warn("Query {} already has cancel reason: {}, new reason {} will be ignored",
|
||||
DebugUtil.printId(queryId), cancelReason, reason);
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel(String reason) {
|
||||
isCancel = true;
|
||||
updateCancelReason(reason);
|
||||
synchronized (this) {
|
||||
if (currentThread != null) {
|
||||
// TODO(cmy): we cannot interrupt this thread, or we may throw
|
||||
@ -160,6 +191,14 @@ public class ResultReceiver {
|
||||
// And user will lost connection to Palo
|
||||
// currentThread.interrupt();
|
||||
}
|
||||
if (fetchDataAsyncFuture != null) {
|
||||
if (fetchDataAsyncFuture.cancel(true)) {
|
||||
LOG.info("ResultReceiver of query {} is cancelled", DebugUtil.printId(queryId));
|
||||
} else {
|
||||
LOG.warn("ResultReceiver of query {} cancel failed, typically means the future is finished",
|
||||
DebugUtil.printId(queryId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user