From 8eb413fa699fe4c981e1e0e01be7eb781e2ed56f Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 22 Feb 2020 22:01:14 +0800 Subject: [PATCH] [Bug][RoutineLoad] Fix bug that routine Load encounter "label already used" exception (#2959) This CL modify 2 things: 1. When a routine load task submit failed, it will not be put back to the task queue. 2. The rpc timeout when executing a routine load task in BE is set to `query_timeout` of the task plan. ISSUE: #2964 --- be/src/exec/tablet_sink.cpp | 2 +- .../doris/analysis/ShowRoutineLoadStmt.java | 1 + .../load/routineload/RoutineLoadJob.java | 7 ++++ .../routineload/RoutineLoadTaskScheduler.java | 41 +++++++++++-------- .../org/apache/doris/task/StreamLoadTask.java | 1 + 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 8b32e71d34..54ec061287 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -79,7 +79,7 @@ Status NodeChannel::init(RuntimeState* state) { _add_batch_request.set_index_id(_index_id); _add_batch_request.set_sender_id(_parent->_sender_id); - _rpc_timeout_ms = config::tablet_writer_rpc_timeout_sec * 1000; + _rpc_timeout_ms = state->query_options().query_timeout * 1000; return Status::OK(); } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 2186e5db7a..262f06f8d8 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -83,6 +83,7 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("Progress") .add("ReasonOfStateChanged") .add("ErrorLogUrls") + .add("OtherMsg") .build(); private final LabelName labelName; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0a924acf9e..0914524e98 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -173,6 +173,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; + // some other msg which need to show to user; + protected String otherMsg = ""; protected String pauseReason = ""; protected String cancelReason = ""; @@ -316,6 +318,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return dbId; } + public void setOtherMsg(String otherMsg) { + this.otherMsg = Strings.nullToEmpty(otherMsg); + } + public String getDbFullName() throws MetaNotFoundException { Database database = Catalog.getCurrentCatalog().getDb(dbId); if (database == null) { @@ -1095,6 +1101,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl row.add(""); } row.add(Joiner.on(", ").join(errorLogUrls)); + row.add(otherMsg); return row; } finally { readUnlock(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 1fdfa1ec21..9e280bc654 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -177,10 +177,20 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { throw e; } - if (!submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask)) { - // submit failed. push it back to the queue to wait next scheduling - routineLoadTaskInfo.setBeId(-1); - needScheduleTasksQueue.put(routineLoadTaskInfo); + try { + submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask); + } catch (LoadException e) { + // submit task failed (such as TOO_MANY_TASKS error), but txn has already begun. + // Here we will still set the ExecuteStartTime of this task, which means + // we "assume" that this task has been successfully submitted. + // And this task will then be aborted because of a timeout. + // In this way, we can prevent the entire job from being paused due to submit errors, + // and we can also relieve the pressure on BE by waiting for the timeout period. + LOG.warn("failed to submit routine load task {} to BE: {}", + DebugUtil.printId(routineLoadTaskInfo.getId()), + routineLoadTaskInfo.getBeId()); + routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage()); + // fall through to set ExecuteStartTime } // set the executeStartTimeMs of task @@ -208,32 +218,28 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { LOG.debug("total tasks num in routine load task queue: {}", needScheduleTasksQueue.size()); } - private boolean submitTask(long beId, TRoutineLoadTask tTask) { + private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException { Backend backend = Catalog.getCurrentSystemInfo().getBackend(beId); if (backend == null) { - LOG.warn("failed to send tasks to backend {} because not exist", beId); - return false; + throw new LoadException("failed to send tasks to backend " + beId + " because not exist"); } TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + boolean ok = false; BackendService.Client client = null; try { client = ClientPool.backendPool.borrowObject(address); TStatus tStatus = client.submit_routine_load_task(Lists.newArrayList(tTask)); ok = true; - - if (tStatus.getStatus_code() == TStatusCode.OK) { - LOG.debug("send routine load task {} to BE: {}", DebugUtil.printId(tTask.id), beId); - return true; - } else { - LOG.info("failed to submit task {}, BE: {}, error code: {}", - DebugUtil.printId(tTask.getId()), beId, tStatus.getStatus_code()); - return false; + + if (tStatus.getStatus_code() != TStatusCode.OK) { + throw new LoadException("failed to submit task. error code: " + tStatus.getStatus_code() + + ", msg: " + (tStatus.getError_msgsSize() > 0 ? tStatus.getError_msgs().get(0) : "NaN")); } + LOG.debug("send routine load task {} to BE: {}", DebugUtil.printId(tTask.id), beId); } catch (Exception e) { - LOG.warn("task send error. backend[{}]", beId, e); - return false; + throw new LoadException("failed to send task: " + e.getMessage(), e); } finally { if (ok) { ClientPool.backendPool.returnObject(address, client); @@ -241,7 +247,6 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { ClientPool.backendPool.invalidateObject(address, client); } } - } // try to allocate a task to BE which has idle slot. diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 3afa2b37bb..067da4a5fb 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -186,6 +186,7 @@ public class StreamLoadTask { partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions()); strictMode = routineLoadJob.isStrictMode(); timezone = routineLoadJob.getTimezone(); + timeout = (int) routineLoadJob.getMaxBatchIntervalS() * 2; } // used for stream load