diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 574ebd1bd0..9280e66d49 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -140,7 +140,10 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { - _shutdown = true; + { + std::unique_lock l(_lock); + _shutdown = true; + } _get_cv.notify_all(); _put_cv.notify_all(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index dcd8afbd0c..3377330b3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -140,10 +141,17 @@ public class StreamLoadPlanner { descTable.computeStatAndMemLayout(); scanNode.finalize(analyzer); + int timeout = taskInfo.getTimeout(); + if (taskInfo instanceof RoutineLoadJob) { + // For routine load, make the timeout fo plan fragment larger than MaxIntervalS config. + // So that the execution won't be killed before consuming finished. + timeout *= 2; + } + // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout); olapTableSink.complete(); // for stream load, we only need one fragment, ScanNode -> DataSink. @@ -178,7 +186,7 @@ public class StreamLoadPlanner { params.setParams(execParams); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQueryType(TQueryType.LOAD); - queryOptions.setQueryTimeout(taskInfo.getTimeout()); + queryOptions.setQueryTimeout(timeout); queryOptions.setMemLimit(taskInfo.getMemLimit()); // for stream load, we use exec_mem_limit to limit the memory usage of load channel. queryOptions.setLoadMemLimit(taskInfo.getMemLimit());