From 409cee0fdbe59c4b6c5a1a728f8d5e4ccf4985e8 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 15 Jul 2021 11:21:01 +0800 Subject: [PATCH] [Bug][RoutineLoad] Fix bug that routine load thread on BE may be blocked (#6215) * [Bug][RoutineLoad] Fix bug that routine load thread on BE may be blocked This bug will cause the routine load job throw TOO MANY TASK error, and routine load job is blocked. * fix ut Co-authored-by: chenmingyu --- be/src/util/blocking_priority_queue.hpp | 5 ++++- .../org/apache/doris/planner/StreamLoadPlanner.java | 12 ++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 574ebd1bd0..9280e66d49 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -140,7 +140,10 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { - _shutdown = true; + { + std::unique_lock l(_lock); + _shutdown = true; + } _get_cv.notify_all(); _put_cv.notify_all(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index dcd8afbd0c..3377330b3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -140,10 +141,17 @@ public class StreamLoadPlanner { descTable.computeStatAndMemLayout(); scanNode.finalize(analyzer); + int timeout = taskInfo.getTimeout(); + if (taskInfo instanceof RoutineLoadJob) { + // For routine load, make the timeout fo plan fragment larger than MaxIntervalS config. + // So that the execution won't be killed before consuming finished. + timeout *= 2; + } + // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout); olapTableSink.complete(); // for stream load, we only need one fragment, ScanNode -> DataSink. @@ -178,7 +186,7 @@ public class StreamLoadPlanner { params.setParams(execParams); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQueryType(TQueryType.LOAD); - queryOptions.setQueryTimeout(taskInfo.getTimeout()); + queryOptions.setQueryTimeout(timeout); queryOptions.setMemLimit(taskInfo.getMemLimit()); // for stream load, we use exec_mem_limit to limit the memory usage of load channel. queryOptions.setLoadMemLimit(taskInfo.getMemLimit());