[opt](query cancel) cancel query if it has pipeline task leakage #39223 (#39537)

pick #39223 with some modifications. Optimization will only be applied
to pipeline x.
This commit is contained in:
zhiqiang
2024-08-19 14:33:59 +08:00
committed by GitHub
parent c0cbb2362c
commit 830f250a80
15 changed files with 295 additions and 37 deletions

View File

@ -41,16 +41,35 @@ public:
std::unique_ptr<ThreadPoolToken> token_;
};
const std::string toString(QuerySource queryType) {
switch (queryType) {
case QuerySource::INTERNAL_FRONTEND:
return "INTERNAL_FRONTEND";
case QuerySource::STREAM_LOAD:
return "STREAM_LOAD";
case QuerySource::GROUP_COMMIT_LOAD:
return "EXTERNAL_QUERY";
case QuerySource::ROUTINE_LOAD:
return "ROUTINE_LOAD";
case QuerySource::EXTERNAL_CONNECTOR:
return "EXTERNAL_CONNECTOR";
default:
return "UNKNOWN";
}
}
QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
const TQueryOptions& query_options, TNetworkAddress coord_addr,
bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe)
bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe,
QuerySource query_source)
: fragment_num(total_fragment_num),
timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_is_pipeline(is_pipeline),
_is_nereids(is_nereids),
_query_options(query_options) {
_query_options(query_options),
_query_source(query_source) {
_init_query_mem_tracker();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
@ -77,7 +96,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
!this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
DCHECK_EQ(is_report_fe_addr_valid, true);
}
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
register_memory_statistics();
register_cpu_statistics();
}