diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index be4fd6c5ae..7ccfe251a0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -713,7 +713,7 @@ Status PipelineFragmentContext::submit() { if (!st.ok()) { std::lock_guard l(_task_mutex); if (_closed_tasks == _total_tasks) { - std::call_once(_close_once_flag, [this] { _close_action(); }); + _close_fragment_instance(); } return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(), BackendOptions::get_localhost()); @@ -851,7 +851,13 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr return _root_pipeline->set_sink(sink_); } -void PipelineFragmentContext::_close_action() { +// If all pipeline tasks binded to the fragment instance are finished, then we could +// close the fragment instance. +void PipelineFragmentContext::_close_fragment_instance() { + if (_is_fragment_instance_closed) { + return; + } + Defer defer_op {[&]() { _is_fragment_instance_closed = true; }}; _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast(send_report(true)); // all submitted tasks done @@ -862,7 +868,7 @@ void PipelineFragmentContext::close_a_pipeline() { std::lock_guard l(_task_mutex); ++_closed_tasks; if (_closed_tasks == _total_tasks) { - std::call_once(_close_once_flag, [this] { _close_action(); }); + _close_fragment_instance(); } } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index aa2f139c50..39e3dcbe16 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -153,7 +153,7 @@ protected: virtual Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request); template Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); - virtual void _close_action(); + virtual void _close_fragment_instance(); void _init_next_report_time(); void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } @@ -205,7 +205,7 @@ protected: RuntimeProfile::Counter* _prepare_timer; std::function _call_back; - std::once_flag _close_once_flag; + bool _is_fragment_instance_closed = false; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 28e1be496f..b087c3bcab 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -939,7 +939,7 @@ Status PipelineXFragmentContext::submit() { if (!st.ok()) { std::lock_guard l(_task_mutex); if (_closed_tasks == _total_tasks) { - std::call_once(_close_once_flag, [this] { _close_action(); }); + _close_fragment_instance(); } return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(), BackendOptions::get_localhost()); @@ -969,7 +969,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() { } } -void PipelineXFragmentContext::_close_action() { +void PipelineXFragmentContext::_close_fragment_instance() { + if (_is_fragment_instance_closed) { + return; + } + Defer defer_op {[&]() { _is_fragment_instance_closed = true; }}; _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast(send_report(true)); // all submitted tasks done diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 9e7ff42219..23ff08fcb0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -118,7 +118,7 @@ public: std::string debug_string() override; private: - void _close_action() override; + void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, const std::vector& texprs); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e989af75b2..5470ba2259 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -390,6 +390,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, task->set_close_pipeline_time(); task->release_dependency(); task->set_running(false); + // close_a_pipeline may delete fragment context and will core in some defer + // code, because the defer code will access fragment context it self. + std::shared_ptr lock_for_context = + task->fragment_context()->shared_from_this(); task->fragment_context()->close_a_pipeline(); }