From 75a6f28f2e15d940705c77af60e66964ed3301eb Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 11 Jun 2024 10:51:59 +0800 Subject: [PATCH] [cherry-pick]Add query type when report (#35918) pick #34978 --- .../schema_backend_active_tasks.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 9 +++---- be/src/runtime/query_context.cpp | 24 ++++++++++++++----- .../runtime/runtime_query_statistics_mgr.cpp | 13 ++++++++-- be/src/runtime/runtime_query_statistics_mgr.h | 8 ++++--- .../org/apache/doris/catalog/SchemaTable.java | 1 + 6 files changed, 41 insertions(+), 15 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index aa84f0d68c..f1155796ed 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -38,6 +38,7 @@ std::vector SchemaBackendActiveTasksScanner::_s_tbls_ {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, + {"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false}, }; SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner() diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6646b8cdf2..4ca84f9404 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -640,6 +640,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo return Status::OK(); } + LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord + << ", total fragment num on current host: " << params.fragment_num_on_host + << ", fe process uuid: " << params.query_options.fe_process_uuid + << ", query type: " << params.query_options.query_type; + // This may be a first fragment request of the query. // Create the query fragments context. query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env, @@ -653,10 +658,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->file_scan_range_params_map = params.file_scan_params; } - LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo) - << " coord_addr " << query_ctx->coord_addr - << " total fragment num on current host: " << params.fragment_num_on_host - << " fe process uuid: " << params.query_options.fe_process_uuid; query_ctx->query_globals = params.query_globals; if (params.__isset.resource_info) { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 4b4b6b006f..f9cc9757fe 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -53,7 +53,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* _query_options(query_options) { _init_query_mem_tracker(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker); - this->coord_addr = coord_addr; + _start_time = VecDateTimeValue::local_time(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _shared_scanner_controller.reset(new vectorized::SharedScannerController()); @@ -64,6 +64,18 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* timeout_second = query_options.execution_timeout; + bool is_query_type_valid = query_options.query_type == TQueryType::SELECT || + query_options.query_type == TQueryType::LOAD || + query_options.query_type == TQueryType::EXTERNAL; + DCHECK_EQ(is_query_type_valid, true); + + this->coord_addr = coord_addr; + // external query has no coord_addr + if (query_options.query_type != TQueryType::EXTERNAL) { + bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && this->coord_addr.port != 0; + DCHECK_EQ(is_coord_addr_valid, true); + } + register_memory_statistics(); register_cpu_statistics(); } @@ -252,8 +264,8 @@ void QueryContext::set_pipeline_context( } void QueryContext::register_query_statistics(std::shared_ptr qs) { - _exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(_query_id), qs, - coord_addr); + _exec_env->runtime_query_statistics_mgr()->register_query_statistics( + print_id(_query_id), qs, coord_addr, _query_options.query_type); } std::shared_ptr QueryContext::get_query_statistics() { @@ -266,8 +278,8 @@ void QueryContext::register_memory_statistics() { std::shared_ptr qs = query_mem_tracker->get_query_statistics(); std::string query_id = print_id(_query_id); if (qs) { - _exec_env->runtime_query_statistics_mgr()->register_query_statistics(query_id, qs, - coord_addr); + _exec_env->runtime_query_statistics_mgr()->register_query_statistics( + query_id, qs, coord_addr, _query_options.query_type); } else { LOG(INFO) << " query " << query_id << " get memory query statistics failed "; } @@ -278,7 +290,7 @@ void QueryContext::register_cpu_statistics() { if (!_cpu_statistics) { _cpu_statistics = std::make_shared(); _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), _cpu_statistics, coord_addr); + print_id(_query_id), _cpu_statistics, coord_addr, _query_options.query_type); } } diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 955d1b9a7e..5257f53bb0 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -36,10 +36,12 @@ void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, std::shared_ptr qs_ptr, - TNetworkAddress fe_addr) { + TNetworkAddress fe_addr, + TQueryType::type query_type) { std::lock_guard write_lock(_qs_ctx_map_lock); if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { - _query_statistics_ctx_map[query_id] = std::make_unique(fe_addr); + _query_statistics_ctx_map[query_id] = + std::make_unique(fe_addr, query_type); } _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr); } @@ -54,6 +56,9 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { int64_t current_time = MonotonicMillis(); int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) { + continue; + } if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) { std::map tmp_map; fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map); @@ -247,6 +252,10 @@ void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* bloc insert_int_value(8, tqs.current_used_memory_bytes, block); insert_int_value(9, tqs.shuffle_send_bytes, block); insert_int_value(10, tqs.shuffle_send_rows, block); + + std::stringstream ss; + ss << qs_ctx_ptr->_query_type; + insert_string_value(11, ss.str(), block); } } diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 1b3e164d48..aa1793efbf 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -34,7 +34,8 @@ class Block; class QueryStatisticsCtx { public: - QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { + QueryStatisticsCtx(TNetworkAddress fe_addr, TQueryType::type query_type) + : _fe_addr(fe_addr), _query_type(query_type) { this->_is_query_finished = false; this->_wg_id = -1; this->_query_start_time = MonotonicMillis(); @@ -46,7 +47,8 @@ public: public: std::vector> _qs_list; bool _is_query_finished; - TNetworkAddress _fe_addr; + const TNetworkAddress _fe_addr; + const TQueryType::type _query_type; int64_t _query_finish_time; int64_t _wg_id; int64_t _query_start_time; @@ -58,7 +60,7 @@ public: ~RuntimeQueryStatiticsMgr() = default; void register_query_statistics(std::string query_id, std::shared_ptr qs_ptr, - TNetworkAddress fe_addr); + TNetworkAddress fe_addr, TQueryType::type query_type); void report_runtime_query_statistics(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index a77532cef1..92206dc3fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -458,6 +458,7 @@ public class SchemaTable extends Table { .column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("SHUFFLE_SEND_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("QUERY_TYPE", ScalarType.createVarchar(256)) .build())) .put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA, builder().column("QUERY_ID", ScalarType.createVarchar(256))