From 017e5a42fbf8fa8c020a48eb934b7b6e5e8ecbdb Mon Sep 17 00:00:00 2001 From: wangbo Date: Mon, 24 Mar 2025 19:59:15 +0800 Subject: [PATCH] [branch-2.1] pick 49325 fix query statistics leak in BE (#49397) pick #49325 --- .../runtime/runtime_query_statistics_mgr.cpp | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 104a22fb8b..bbc532f907 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -56,26 +56,36 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { std::lock_guard write_lock(_qs_ctx_map_lock); int64_t current_time = MonotonicMillis(); int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; - for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { - if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) { - continue; - } - if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) { - std::map tmp_map; - fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map); - } - - TQueryStatistics ret_t_qs; - qs_ctx_ptr->collect_query_statistics(&ret_t_qs); - fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs; - + for (auto iter = _query_statistics_ctx_map.begin(); + iter != _query_statistics_ctx_map.end();) { + std::string query_id = iter->first; + auto& qs_ctx_ptr = iter->second; bool is_query_finished = qs_ctx_ptr->_is_query_finished; bool is_timeout_after_finish = false; if (is_query_finished) { is_timeout_after_finish = (current_time - qs_ctx_ptr->_query_finish_time) > conf_qs_timeout; } - qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish); + + // external query not need to report to FE, so we can remove it directly. + if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL && is_query_finished) { + iter = _query_statistics_ctx_map.erase(iter); + } else { + if (qs_ctx_ptr->_query_type != TQueryType::EXTERNAL) { + if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) { + std::map tmp_map; + fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map); + } + + TQueryStatistics ret_t_qs; + qs_ctx_ptr->collect_query_statistics(&ret_t_qs); + fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs; + qs_status[query_id] = + std::make_pair(is_query_finished, is_timeout_after_finish); + } + + iter++; + } } } @@ -86,7 +96,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { // 2.1 get client Status coord_status; FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr, - &coord_status); + config::thrift_rpc_timeout_ms, &coord_status); std::string add_str = PrintThriftNetworkAddress(addr); if (!coord_status.ok()) { std::stringstream ss; @@ -116,7 +126,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { } catch (apache::thrift::transport::TTransportException& e) { LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to " << add_str << " failed, reason: " << e.what(); - rpc_status = coord.reopen(); + rpc_status = coord.reopen(config::thrift_rpc_timeout_ms); if (!rpc_status.ok()) { LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report " "workload runtime statistics to"