[enhancement](bulk-load) cancel loading tasks directly without retrying when timeout exceeded (#28666)

This commit is contained in:
Siyang Tang
2023-12-24 21:57:31 +08:00
committed by GitHub
parent 1e44a4b145
commit 2f7deb3dea
2 changed files with 18 additions and 14 deletions

View File

@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@ -124,7 +125,7 @@ public abstract class BulkLoadJob extends LoadJob {
break;
case SPARK:
bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), stmt.getResourceDesc(),
stmt.getOrigStmt(), stmt.getUserInfo());
stmt.getOrigStmt(), stmt.getUserInfo());
break;
case MINI:
case DELETE:
@ -209,7 +210,10 @@ public abstract class BulkLoadJob extends LoadJob {
if (loadTask == null) {
return;
}
if (loadTask.getRetryTime() <= 0) {
Predicate<LoadTask> isTaskTimeout =
(LoadTask task) -> task instanceof LoadLoadingTask
&& ((LoadLoadingTask) task).getLeftTimeMs() <= 0;
if (loadTask.getRetryTime() <= 0 || isTaskTimeout.test(loadTask)) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;

View File

@ -110,7 +110,7 @@ public class LoadLoadingTask extends LoadTask {
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList,
int fileNum, UserIdentity userInfo) throws UserException {
int fileNum, UserIdentity userInfo) throws UserException {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism,
@ -154,23 +154,23 @@ public class LoadLoadingTask extends LoadTask {
* here we use exec_mem_limit to directly override the load_mem_limit property.
*/
curCoordinator.setLoadMemLimit(execMemLimit);
curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
long leftTimeMs = getLeftTimeMs();
if (leftTimeMs <= 0) {
throw new LoadException("failed to execute loading task when timeout");
}
int timeoutS = (int) (leftTimeMs / 1000);
curCoordinator.setTimeout(timeoutS);
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
actualExecute(curCoordinator);
actualExecute(curCoordinator, timeoutS);
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
}
private void actualExecute(Coordinator curCoordinator) throws Exception {
int waitSecond = (int) (getLeftTimeMs() / 1000);
if (waitSecond <= 0) {
throw new LoadException("failed to execute plan when the left time is less than 0");
}
private void actualExecute(Coordinator curCoordinator, int waitSecond) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("task_id", signature)
@ -199,8 +199,8 @@ public class LoadLoadingTask extends LoadTask {
}
}
private long getLeftTimeMs() {
return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L);
public long getLeftTimeMs() {
return jobDeadlineMs - System.currentTimeMillis();
}
private void createProfile(Coordinator coord) {