From 4532ba990a336b78027d4e33d06b79745214cf7b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 4 Jul 2024 10:02:56 +0800 Subject: [PATCH] [fix](pipeline) Avoid to close task twice (#36747) (#37115) --- be/src/pipeline/pipeline_task.h | 2 ++ be/src/pipeline/pipeline_x/pipeline_x_task.h | 2 +- be/src/pipeline/task_scheduler.cpp | 4 ++++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index b9a5cb06ff..47f96c5f8f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -242,6 +242,8 @@ public: } } + virtual bool is_finished() const { return false; } + virtual void set_close_pipeline_time() { if (!_is_close_pipeline) { _close_pipeline_time = _pipeline_task_watcher.elapsed_time(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 83c0782784..d51cd67072 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -88,7 +88,7 @@ public: void finalize() override; - bool is_finished() const { return _finished.load(); } + bool is_finished() const override { return _finished.load(); } std::string debug_string() override; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c40193cb0e..f2c8616818 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -231,6 +231,10 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status // Should count the memory to the query or the query's memory will not decrease when part of // task finished. SCOPED_ATTACH_TASK(task->runtime_state()); + if (task->is_finished()) { + task->set_running(false); + return; + } // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. auto lock_for_context = task->fragment_context()->shared_from_this();