[fix](cancel)) Fix pipeline task leak cancel (#39697)

pick #39737
This commit is contained in:
zhiqiang
2024-08-22 00:40:22 +08:00
committed by GitHub
parent 1e47d11560
commit 56cc9cc304
3 changed files with 11 additions and 9 deletions

View File

@ -186,13 +186,6 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// We need a more detail discussion.
_query_ctx->cancel(msg, Status::Cancelled(msg));
if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
if (msg.find("Pipeline task leak.") != std::string::npos) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
this->debug_string());
}
}
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {

View File

@ -147,6 +147,14 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
}
_query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id);
if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
if (msg.find("Pipeline task leak.") != std::string::npos) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
this->debug_string());
}
}
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {

View File

@ -183,6 +183,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
rpc_result.running_queries.end());
return Status::OK();
};
@ -1310,8 +1311,8 @@ void FragmentMgr::cancel_worker() {
itr != running_queries_on_all_fes.end()) {
// Query not found on this frontend, and the query arrives before the last check
if (itr->second.find(q_ctx->query_id()) == itr->second.end() &&
q_ctx->get_query_arrival_timestamp().tv_nsec <
check_invalid_query_last_timestamp.tv_nsec &&
q_ctx->get_query_arrival_timestamp().tv_sec <
check_invalid_query_last_timestamp.tv_sec &&
q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
if (q_ctx->enable_pipeline_x_exec()) {
queries_pipeline_task_leak.push_back(q_ctx->query_id());