diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index b9a5cb06ff..47f96c5f8f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -242,6 +242,8 @@ public: } } + virtual bool is_finished() const { return false; } + virtual void set_close_pipeline_time() { if (!_is_close_pipeline) { _close_pipeline_time = _pipeline_task_watcher.elapsed_time(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 83c0782784..d51cd67072 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -88,7 +88,7 @@ public: void finalize() override; - bool is_finished() const { return _finished.load(); } + bool is_finished() const override { return _finished.load(); } std::string debug_string() override; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c40193cb0e..f2c8616818 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -231,6 +231,10 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status // Should count the memory to the query or the query's memory will not decrease when part of // task finished. SCOPED_ATTACH_TASK(task->runtime_state()); + if (task->is_finished()) { + task->set_running(false); + return; + } // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. auto lock_for_context = task->fragment_context()->shared_from_this();