[branch-2.1] pick 49325 fix query statistics leak in BE (#49397)
pick #49325
This commit is contained in:
@ -56,26 +56,36 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
|
||||
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
|
||||
int64_t current_time = MonotonicMillis();
|
||||
int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
|
||||
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
|
||||
if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
|
||||
continue;
|
||||
}
|
||||
if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
|
||||
std::map<std::string, TQueryStatistics> tmp_map;
|
||||
fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
|
||||
}
|
||||
|
||||
TQueryStatistics ret_t_qs;
|
||||
qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
|
||||
fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
|
||||
|
||||
for (auto iter = _query_statistics_ctx_map.begin();
|
||||
iter != _query_statistics_ctx_map.end();) {
|
||||
std::string query_id = iter->first;
|
||||
auto& qs_ctx_ptr = iter->second;
|
||||
bool is_query_finished = qs_ctx_ptr->_is_query_finished;
|
||||
bool is_timeout_after_finish = false;
|
||||
if (is_query_finished) {
|
||||
is_timeout_after_finish =
|
||||
(current_time - qs_ctx_ptr->_query_finish_time) > conf_qs_timeout;
|
||||
}
|
||||
qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish);
|
||||
|
||||
// external query not need to report to FE, so we can remove it directly.
|
||||
if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL && is_query_finished) {
|
||||
iter = _query_statistics_ctx_map.erase(iter);
|
||||
} else {
|
||||
if (qs_ctx_ptr->_query_type != TQueryType::EXTERNAL) {
|
||||
if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
|
||||
std::map<std::string, TQueryStatistics> tmp_map;
|
||||
fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
|
||||
}
|
||||
|
||||
TQueryStatistics ret_t_qs;
|
||||
qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
|
||||
fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
|
||||
qs_status[query_id] =
|
||||
std::make_pair(is_query_finished, is_timeout_after_finish);
|
||||
}
|
||||
|
||||
iter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +96,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
|
||||
// 2.1 get client
|
||||
Status coord_status;
|
||||
FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
|
||||
&coord_status);
|
||||
config::thrift_rpc_timeout_ms, &coord_status);
|
||||
std::string add_str = PrintThriftNetworkAddress(addr);
|
||||
if (!coord_status.ok()) {
|
||||
std::stringstream ss;
|
||||
@ -116,7 +126,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
|
||||
} catch (apache::thrift::transport::TTransportException& e) {
|
||||
LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to "
|
||||
<< add_str << " failed, reason: " << e.what();
|
||||
rpc_status = coord.reopen();
|
||||
rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);
|
||||
if (!rpc_status.ok()) {
|
||||
LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report "
|
||||
"workload runtime statistics to"
|
||||
|
||||
Reference in New Issue
Block a user