From 2f7deb3dea6e5d8514d0492cb4145ddb18a948ca Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Sun, 24 Dec 2023 21:57:31 +0800 Subject: [PATCH] [enhancement](bulk-load) cancel loading tasks directly without retrying when timeout exceeded (#28666) --- .../apache/doris/load/loadv2/BulkLoadJob.java | 8 +++++-- .../doris/load/loadv2/LoadLoadingTask.java | 24 +++++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index d5b173d82f..7a9f160b3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.RejectedExecutionException; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -124,7 +125,7 @@ public abstract class BulkLoadJob extends LoadJob { break; case SPARK: bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), stmt.getResourceDesc(), - stmt.getOrigStmt(), stmt.getUserInfo()); + stmt.getOrigStmt(), stmt.getUserInfo()); break; case MINI: case DELETE: @@ -209,7 +210,10 @@ public abstract class BulkLoadJob extends LoadJob { if (loadTask == null) { return; } - if (loadTask.getRetryTime() <= 0) { + Predicate isTaskTimeout = + (LoadTask task) -> task instanceof LoadLoadingTask + && ((LoadLoadingTask) task).getLeftTimeMs() <= 0; + if (loadTask.getRetryTime() <= 0 || isTaskTimeout.test(loadTask)) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index fa2eadcfa6..97abfb0db9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -110,7 +110,7 @@ public class LoadLoadingTask extends LoadTask { } public void init(TUniqueId loadId, List> fileStatusList, - int fileNum, UserIdentity userInfo) throws UserException { + int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, @@ -154,23 +154,23 @@ public class LoadLoadingTask extends LoadTask { * here we use exec_mem_limit to directly override the load_mem_limit property. */ curCoordinator.setLoadMemLimit(execMemLimit); - curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); - curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode); + + long leftTimeMs = getLeftTimeMs(); + if (leftTimeMs <= 0) { + throw new LoadException("failed to execute loading task when timeout"); + } + int timeoutS = (int) (leftTimeMs / 1000); + curCoordinator.setTimeout(timeoutS); try { QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); - actualExecute(curCoordinator); + actualExecute(curCoordinator, timeoutS); } finally { QeProcessorImpl.INSTANCE.unregisterQuery(loadId); } } - private void actualExecute(Coordinator curCoordinator) throws Exception { - int waitSecond = (int) (getLeftTimeMs() / 1000); - if (waitSecond <= 0) { - throw new LoadException("failed to execute plan when the left time is less than 0"); - } - + private void actualExecute(Coordinator curCoordinator, int waitSecond) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) .add("task_id", signature) @@ -199,8 +199,8 @@ public class LoadLoadingTask extends LoadTask { } } - private long getLeftTimeMs() { - return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L); + public long getLeftTimeMs() { + return jobDeadlineMs - System.currentTimeMillis(); } private void createProfile(Coordinator coord) {