diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0eba79b25c..3b02373ecb 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -454,7 +454,7 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { template void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { _is_finishing = true; - _context->cancel(true, err, Status::Cancelled(err)); + _context->cancel(err, Status::Cancelled(err)); _ended(id); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 31f8423334..2c38ef9c89 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -158,9 +158,12 @@ bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const { return false; } +// Must not add lock in this method. Because it will call query ctx cancel. And +// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running +// Method like exchange sink buffer will call query ctx cancel. If we add lock here +// There maybe dead lock. void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard l(_cancel_lock); LOG_INFO("PipelineFragmentContext::cancel") .tag("query_id", print_id(_query_ctx->query_id())) .tag("fragment_id", _fragment_id) @@ -172,30 +175,29 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // can not be cancelled if other fragments set the query_ctx cancelled, this will // make result receiver on fe be stocked on rpc forever until timeout... // We need a more detail discussion. - if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _is_report_on_cancel = false; - } else { - LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) - << " is canceled, cancel message: " << msg; - } - - _runtime_state->set_process_status(_query_ctx->exec_status()); - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); - } - - // must close stream_mgr to avoid dead lock in Exchange Node - // TODO bug llj fix this other instance will not cancel - _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); - // Cancel the result queue manager used by spark doris connector - // TODO pipeline incomp - // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); + _query_ctx->cancel(msg, Status::Cancelled(msg)); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { + LOG(WARNING) << "PipelineFragmentContext " + << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) + << " is canceled, cancel message: " << msg; } + + _runtime_state->set_process_status(_query_ctx->exec_status()); + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } + + // must close stream_mgr to avoid dead lock in Exchange Node + // TODO bug llj fix this other instance will not cancel + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); + // Cancel the result queue manager used by spark doris connector + // TODO pipeline incomp + // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } PipelinePtr PipelineFragmentContext::add_pipeline() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8ad36612f4..96936233b3 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -213,7 +213,6 @@ protected: VecDateTimeValue _start_time; int _timeout = -1; - std::mutex _cancel_lock; private: std::vector> _tasks; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index fa53e6f4b1..4419ecbe7f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -126,7 +126,6 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::lock_guard l(_cancel_lock); LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) @@ -135,25 +134,24 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, if (reason == PPlanFragmentCancelReason::TIMEOUT) { LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } - if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) { - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _is_report_on_cancel = false; - } else { - for (auto& id : _fragment_instance_ids) { - LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); - } + _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } else { + for (auto& id : _fragment_instance_ids) { + LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); } - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); - } - - // Cancel the result queue manager used by spark doris connector - // TODO pipeline incomp - // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } + + // Cancel the result queue manager used by spark doris connector + // TODO pipeline incomp + // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); for (auto& tasks : _tasks) { for (auto& task : tasks) { task->clear_blocking_state(); @@ -1326,7 +1324,7 @@ void PipelineXFragmentContext::close_if_prepare_failed(Status st) { close_a_pipeline(); } } - _query_ctx->cancel(true, st.to_string(), st, _fragment_id); + _query_ctx->cancel(st.to_string(), st, _fragment_id); } void PipelineXFragmentContext::_close_fragment_instance() { diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 3e03f3636f..8819067e59 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -135,7 +135,7 @@ void BlockedTaskScheduler::_schedule() { << ", instance_id=" << print_id(task->instance_id()) << ", task info: " << task->debug_string(); - task->query_context()->cancel(true, "", Status::Cancelled("")); + task->query_context()->cancel("", Status::Cancelled("")); _make_task_run(local_blocked_tasks, iter); } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { if (task->has_dependency()) { @@ -241,7 +241,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, std::string(status.msg())); } else { - task->query_context()->cancel(true, status.to_string(), + task->query_context()->cancel(status.to_string(), Status::Cancelled(status.to_string())); } state = PipelineTaskState::CANCELED; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d852385d26..68c4afa382 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1002,7 +1002,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan } } - query_ctx->cancel(true, msg, Status::Cancelled(msg)); + query_ctx->cancel(msg, Status::Cancelled(msg)); { std::lock_guard state_lock(_lock); _query_ctx_map.erase(query_id); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 4fb5df7c7d..681d0e333c 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -157,12 +157,14 @@ void QueryContext::set_execution_dependency_ready() { _execution_dependency->set_ready(); } -bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragment_id) { - if (_is_cancelled) { - return false; +void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) { + // Just for CAS need a left value + bool false_cancel = false; + if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) { + return; } + DCHECK(!false_cancel && _is_cancelled); set_exec_status(new_status); - _is_cancelled.store(v); set_ready_to_execute(true); std::vector> ctx_to_cancel; @@ -175,12 +177,14 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme ctx_to_cancel.push_back(f_context); } } + // Must not add lock here. There maybe dead lock because it will call fragment + // ctx cancel and fragment ctx will call query ctx cancel. for (auto& f_context : ctx_to_cancel) { if (auto pipeline_ctx = f_context.lock()) { pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg); } } - return true; + return; } void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1551af46c9..5dd0999a63 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -114,7 +114,7 @@ public: const std::string& msg); void set_pipeline_context(const int fragment_id, std::shared_ptr pip_ctx); - bool cancel(bool v, std::string msg, Status new_status, int fragment_id = -1); + void cancel(std::string msg, Status new_status, int fragment_id = -1); void set_exec_status(Status new_status) { if (new_status.ok()) {