[Bug][RoutineLoad] Fix bug that routine load thread on BE may be blocked (#6215)

* [Bug][RoutineLoad] Fix bug that routine load thread on BE may be blocked

This bug will cause the routine load job throw TOO MANY TASK error, and routine
load job is blocked.

* fix ut

Co-authored-by: chenmingyu <chenmingyu@baidu.com>
This commit is contained in:
Mingyu Chen
2021-07-15 11:21:01 +08:00
committed by GitHub
parent 68f988b78a
commit 409cee0fdb
2 changed files with 14 additions and 3 deletions

View File

@ -40,6 +40,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@ -140,10 +141,17 @@ public class StreamLoadPlanner {
descTable.computeStatAndMemLayout();
scanNode.finalize(analyzer);
int timeout = taskInfo.getTimeout();
if (taskInfo instanceof RoutineLoadJob) {
// For routine load, make the timeout fo plan fragment larger than MaxIntervalS config.
// So that the execution won't be killed before consuming finished.
timeout *= 2;
}
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds);
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout());
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout);
olapTableSink.complete();
// for stream load, we only need one fragment, ScanNode -> DataSink.
@ -178,7 +186,7 @@ public class StreamLoadPlanner {
params.setParams(execParams);
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
queryOptions.setQueryTimeout(taskInfo.getTimeout());
queryOptions.setQueryTimeout(timeout);
queryOptions.setMemLimit(taskInfo.getMemLimit());
// for stream load, we use exec_mem_limit to limit the memory usage of load channel.
queryOptions.setLoadMemLimit(taskInfo.getMemLimit());