diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7ccfe251a0..d4c2fd2b69 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -703,9 +703,18 @@ Status PipelineFragmentContext::submit() { for (auto& task : _tasks) { st = scheduler->schedule_task(task.get()); if (!st) { - std::lock_guard l(_status_lock); - cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); - _total_tasks = submit_tasks; + { + std::lock_guard l(_status_lock); + cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); + } + { + // The fragment instance may contains 10 tasks, maybe 8 of them is in scheduler running + // and the 9th failed to add to scheduler, then it will update total_tasks. + // But the previous 8 tasks may finished and try to access total_tasks. So that + // has to use a lock to protect it. + std::lock_guard l(_task_mutex); + _total_tasks = submit_tasks; + } break; } submit_tasks++;