diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f0282e9d7c..7d3dba8102 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -65,6 +65,7 @@ #include "runtime/primitive_type.h" #include "runtime/query_context.h" #include "runtime/runtime_filter_mgr.h" +#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" @@ -707,6 +708,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->use_task_group_for_cpu_limit.store(true); } LOG(INFO) << ss.str(); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), tg_id); } else { VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) << " no task group found, does not use task group."; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 9ce12f1532..de7628c174 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -141,6 +141,7 @@ public: _consumption->add(bytes); if (_query_statistics) { _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); + _query_statistics->set_current_used_memory_bytes(_consumption->current_value()); } } diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index f71bad24e8..ab49b02ad4 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -44,9 +44,17 @@ void QueryStatistics::merge(const QueryStatistics& other) { scan_bytes += other.scan_bytes; int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed); cpu_nanos += other_cpu_time; - if (other.max_peak_memory_bytes > this->max_peak_memory_bytes) { - this->max_peak_memory_bytes = other.max_peak_memory_bytes.load(std::memory_order_relaxed); + + int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed); + if (other_peak_mem > this->max_peak_memory_bytes) { + this->max_peak_memory_bytes = other_peak_mem; } + + int64_t other_memory_used = other.current_used_memory_bytes.load(std::memory_order_relaxed); + if (other_memory_used > 0) { + this->current_used_memory_bytes = other_memory_used; + } + for (auto& other_node_statistics : other._nodes_statistics_map) { int64_t node_id = other_node_statistics.first; auto node_statistics = add_nodes_statistics(node_id); @@ -70,11 +78,13 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { DCHECK(statistics != nullptr); - statistics->__set_scan_bytes(scan_bytes); - statistics->__set_scan_rows(scan_rows); + statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed)); + statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed)); statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / NANOS_PER_MILLIS); statistics->__set_returned_rows(returned_rows); statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed)); + statistics->__set_current_used_memory_bytes( + current_used_memory_bytes.load(std::memory_order_relaxed)); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 7f11daf6ec..abaf0a251a 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -64,14 +64,19 @@ public: scan_bytes(0), cpu_nanos(0), returned_rows(0), - max_peak_memory_bytes(0) {} + max_peak_memory_bytes(0), + current_used_memory_bytes(0) {} virtual ~QueryStatistics(); void merge(const QueryStatistics& other); - void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; } + void add_scan_rows(int64_t delta_scan_rows) { + this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed); + } - void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; } + void add_scan_bytes(int64_t delta_scan_bytes) { + this->scan_bytes.fetch_add(delta_scan_bytes, std::memory_order_relaxed); + } void add_cpu_nanos(int64_t delta_cpu_time) { this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed); @@ -95,6 +100,10 @@ public: this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed); } + void set_current_used_memory_bytes(int64_t current_used_memory) { + this->current_used_memory_bytes.store(current_used_memory, std::memory_order_relaxed); + } + void merge(QueryStatisticsRecvr* recvr); void merge(QueryStatisticsRecvr* recvr, int sender_id); @@ -121,8 +130,11 @@ public: bool collected() const { return _collected; } void set_collected() { _collected = true; } - int64_t get_scan_rows() { return scan_rows.load(); } - int64_t get_scan_bytes() { return scan_bytes.load(); } + int64_t get_scan_rows() { return scan_rows.load(std::memory_order_relaxed); } + int64_t get_scan_bytes() { return scan_bytes.load(std::memory_order_relaxed); } + int64_t get_current_used_memory_bytes() { + return current_used_memory_bytes.load(std::memory_order_relaxed); + } private: friend class QueryStatisticsRecvr; @@ -139,6 +151,7 @@ private: using NodeStatisticsMap = std::unordered_map; NodeStatisticsMap _nodes_statistics_map; bool _collected = false; + std::atomic current_used_memory_bytes; }; using QueryStatisticsPtr = std::shared_ptr; // It is used for collecting sub plan query statistics in DataStreamRecvr. diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 6df9c0b858..a658e527f6 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -30,6 +30,7 @@ void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { tmp_qs.merge(*qs_ptr); } tmp_qs.to_thrift(tq_s); + tq_s->__set_workload_group_id(_wg_id); } void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, @@ -171,4 +172,12 @@ std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_sta return qs_ptr; } +void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { + // wg id just need eventual consistency, read lock is ok + std::shared_lock read_lock(_qs_ctx_map_lock); + if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { + _query_statistics_ctx_map.at(query_id)->_wg_id = wg_id; + } +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 6f1ea11a61..98d4f55472 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -28,6 +28,7 @@ class QueryStatisticsCtx { public: QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { this->_is_query_finished = false; + this->_wg_id = -1; } ~QueryStatisticsCtx() = default; @@ -38,6 +39,7 @@ public: bool _is_query_finished; TNetworkAddress _fe_addr; int64_t _query_finish_time; + int64_t _wg_id; }; class RuntimeQueryStatiticsMgr { @@ -54,6 +56,8 @@ public: std::shared_ptr get_runtime_query_statistics(std::string query_id); + void set_workload_group_id(std::string query_id, int64_t wg_id); + private: std::shared_mutex _qs_ctx_map_lock; std::map> _query_statistics_ctx_map; diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md new file mode 100644 index 0000000000..35a71b5eb6 --- /dev/null +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -0,0 +1,83 @@ +--- +{ + "title": "ACTIVE_QUERIES", + "language": "en" +} +--- + + + +## `active_queries` + +### Name + + + +active_queries + + + +### description + +Table-Value-Function, generate a temporary table named active_queries. This tvf is used to view the information of running queries in doris cluster. + +This function is used in FROM clauses. + +#### syntax +`active_queries()` + +active_queries() table schema: +``` +mysql [(none)]> desc function active_queries(); ++------------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------------+--------+------+-------+---------+-------+ +| BeHost | TEXT | No | false | NULL | NONE | +| BePort | BIGINT | No | false | NULL | NONE | +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | +| ScanRows | BIGINT | No | false | NULL | NONE | +| ScanBytes | BIGINT | No | false | NULL | NONE | +| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | +| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------------+--------+------+-------+---------+-------+ +14 rows in set (0.00 sec) +``` + +### example +``` +mysql [(none)]>select * from active_queries(); ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +1 row in set (0.01 sec) +``` + +### keywords + + active_queries diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md b/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md deleted file mode 100644 index ebc2cb3ebe..0000000000 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md +++ /dev/null @@ -1,79 +0,0 @@ ---- -{ - "title": "QUERIES", - "language": "en" -} ---- - - - -## `queries` - -### Name - - - -queries - - - -### description - -Table-Value-Function, generate a temporary table named queries. This tvf is used to view the information of running queries and history queries in doris cluster. - -This function is used in FROM clauses. - -#### syntax -`queries()` - -queries() table schema: -``` -mysql> desc function queries(); -+------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------+--------+------+-------+---------+-------+ -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | BIGINT | No | false | NULL | NONE | -| EndTime | BIGINT | No | false | NULL | NONE | -| EventTime | BIGINT | No | false | NULL | NONE | -| Latency | BIGINT | No | false | NULL | NONE | -| State | TEXT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -+------------------+--------+------+-------+---------+-------+ -9 rows in set (0.00 sec) -``` - -### example -``` -mysql> select* from queries(); -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -| QueryId | StartTime | EndTime | EventTime | Latency | State | Database | Sql | FrontendInstance | -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -| e1293f2ed2a5427a-982301c462586043 | 1699255138730 | 1699255139823 | 1699255139823 | 1093 | FINISHED | demo | select* from queries() | localhost | -| 46fa3ad0e7814ebd-b1cd34940a29b1e9 | 1699255143588 | -1 | 1699255143588 | 20 | RUNNING | demo | select* from queries() | localhost | -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -2 rows in set (0.04 sec) -``` - -### keywords - - queries diff --git a/docs/sidebars.json b/docs/sidebars.json index 5d6a34e2a0..42b442c597 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -771,7 +771,7 @@ "sql-manual/sql-functions/table-functions/workload-group", "sql-manual/sql-functions/table-functions/catalogs", "sql-manual/sql-functions/table-functions/frontends_disks", - "sql-manual/sql-functions/table-functions/queries", + "sql-manual/sql-functions/table-functions/active_queries", "sql-manual/sql-functions/table-functions/jobs", "sql-manual/sql-functions/table-functions/mv_infos", "sql-manual/sql-functions/table-functions/tasks" diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md new file mode 100644 index 0000000000..bdae08285f --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -0,0 +1,83 @@ +--- +{ + "title": "ACTIVE_QUERIES", + "language": "zh-CN" +} +--- + + + +## `active_queries` + +### Name + + + +active_queries + + + +### description + +表函数,生成active_queries临时表,可以查看当前doris集群中正在运行的 query 信息。 + +该函数用于from子句中。 + +#### syntax +`active_queries()` + +active_queries()表结构: +``` +mysql [(none)]> desc function active_queries(); ++------------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------------+--------+------+-------+---------+-------+ +| BeHost | TEXT | No | false | NULL | NONE | +| BePort | BIGINT | No | false | NULL | NONE | +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | +| ScanRows | BIGINT | No | false | NULL | NONE | +| ScanBytes | BIGINT | No | false | NULL | NONE | +| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | +| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------------+--------+------+-------+---------+-------+ +14 rows in set (0.00 sec) +``` + +### example +``` +mysql [(none)]>select * from active_queries(); ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | ++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ +1 row in set (0.01 sec) +``` + +### keywords + + active_queries diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md deleted file mode 100644 index e3f22da7ad..0000000000 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md +++ /dev/null @@ -1,79 +0,0 @@ ---- -{ - "title": "QUERIES", - "language": "zh-CN" -} ---- - - - -## `queries` - -### Name - - - -queries - - - -### description - -表函数,生成queries临时表,可以查看当前doris集群中正在运行的以及历史的 query 信息。 - -该函数用于from子句中。 - -#### syntax -`queries()` - -queries()表结构: -``` -mysql> desc function queries(); -+------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------+--------+------+-------+---------+-------+ -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | BIGINT | No | false | NULL | NONE | -| EndTime | BIGINT | No | false | NULL | NONE | -| EventTime | BIGINT | No | false | NULL | NONE | -| Latency | BIGINT | No | false | NULL | NONE | -| State | TEXT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -+------------------+--------+------+-------+---------+-------+ -9 rows in set (0.00 sec) -``` - -### example -``` -mysql> select* from queries(); -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -| QueryId | StartTime | EndTime | EventTime | Latency | State | Database | Sql | FrontendInstance | -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -| e1293f2ed2a5427a-982301c462586043 | 1699255138730 | 1699255139823 | 1699255139823 | 1093 | FINISHED | demo | select* from queries() | localhost | -| 46fa3ad0e7814ebd-b1cd34940a29b1e9 | 1699255143588 | -1 | 1699255143588 | 20 | RUNNING | demo | select* from queries() | localhost | -+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+ -2 rows in set (0.04 sec) -``` - -### keywords - - queries diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index d1b63fe623..b45847088d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; @@ -29,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; -import org.apache.doris.nereids.trees.expressions.functions.table.Queries; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups; @@ -54,7 +54,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Hdfs.class, "hdfs"), tableValued(HttpStream.class, "http_stream"), tableValued(Numbers.class, "numbers"), - tableValued(Queries.class, "queries"), + tableValued(ActiveQueries.class, "active_queries"), tableValued(S3.class, "s3"), tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java deleted file mode 100644 index 508f8aade7..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.httpv2.rest; - -import org.apache.doris.httpv2.entity.ResponseEntityBuilder; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryDetail; -import org.apache.doris.qe.QueryDetailQueue; - -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RestController; - -import java.util.List; -import java.util.Map; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -// This class is used to get current query_id of connection_id. -// Every connection holds at most one query at every point. -// Some we can get query_id firstly, and get query by query_id. -@RestController -public class QueryDetailAction extends RestBaseController { - private static final Logger LOG = LogManager.getLogger(QueryDetailAction.class); - - @RequestMapping(path = "/api/query_detail", method = RequestMethod.GET) - protected Object query_detail(HttpServletRequest request, HttpServletResponse response) { - executeCheckPassword(request, response); - checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - - String eventTimeStr = request.getParameter("event_time"); - if (Strings.isNullOrEmpty(eventTimeStr)) { - return ResponseEntityBuilder.badRequest("Missing event_time"); - } - - long eventTime = Long.valueOf(eventTimeStr.trim()); - List queryDetails = QueryDetailQueue.getQueryDetails(eventTime); - - Map> result = Maps.newHashMap(); - result.put("query_details", queryDetails); - return ResponseEntityBuilder.ok(result); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java similarity index 87% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java index dd90853bf7..f8dcaa4a7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java @@ -22,7 +22,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.Properties; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.QueriesTableValuedFunction; +import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; import java.util.Map; @@ -30,10 +30,10 @@ import java.util.Map; /** * queries tvf */ -public class Queries extends TableValuedFunction { +public class ActiveQueries extends TableValuedFunction { - public Queries(Properties properties) { - super("queries", properties); + public ActiveQueries(Properties properties) { + super("active_queries", properties); } @Override @@ -45,7 +45,7 @@ public class Queries extends TableValuedFunction { protected TableValuedFunctionIf toCatalogFunction() { try { Map arguments = getTVFProperties().getMap(); - return new QueriesTableValuedFunction(arguments); + return new ActiveQueriesTableValuedFunction(arguments); } catch (Throwable t) { throw new AnalysisException("Can not build FrontendsTableValuedFunction by " + this + ": " + t.getMessage(), t); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index fe07097bc8..fba34d4816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.expressions.visitor; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; @@ -29,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; -import org.apache.doris.nereids.trees.expressions.functions.table.Queries; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; @@ -91,7 +91,7 @@ public interface TableValuedFunctionVisitor { return visitTableValuedFunction(numbers, context); } - default R visitQueries(Queries queries, C context) { + default R visitQueries(ActiveQueries queries, C context) { return visitTableValuedFunction(queries, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 7b6e86ca3a..9311b4ca8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -89,18 +89,6 @@ public class AuditLogHelper { } } auditEventBuilder.setIsQuery(true); - if (ctx.getQueryDetail() != null) { - ctx.getQueryDetail().setEventTime(endTime); - ctx.getQueryDetail().setEndTime(endTime); - ctx.getQueryDetail().setLatency(elapseMs); - if (ctx.isKilled()) { - ctx.getQueryDetail().setState(QueryDetail.QueryMemState.CANCELLED); - } else { - ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED); - } - QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail()); - ctx.setQueryDetail(null); - } } else { auditEventBuilder.setIsQuery(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index a2a23488cf..44999ecef6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.common.UserException; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -45,4 +46,6 @@ public interface QeProcessor { Coordinator getCoordinator(TUniqueId queryId); List getAllCoordinators(); + + Map getQueryInfoMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index b6d902b76c..97a0a95d24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -246,6 +247,18 @@ public final class QeProcessorImpl implements QeProcessor { return ""; } + public Map getQueryInfoMap() { + Map retQueryInfoMap = Maps.newHashMap(); + Set queryIdSet = coordinatorMap.keySet(); + for (TUniqueId qid : queryIdSet) { + QueryInfo queryInfo = coordinatorMap.get(qid); + if (queryInfo != null) { + retQueryInfoMap.put(DebugUtil.printId(qid), queryInfo); + } + } + return retQueryInfoMap; + } + public static final class QueryInfo { private final ConnectContext connectContext; private final Coordinator coord; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 9c23469886..767d03f43d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1447,15 +1447,6 @@ public class StmtExecutor { Queriable queryStmt = (Queriable) parsedStmt; - QueryDetail queryDetail = new QueryDetail(context.getStartTime(), - DebugUtil.printId(context.queryId()), - context.getStartTime(), -1, -1, - QueryDetail.QueryMemState.RUNNING, - context.getDatabase(), - originStmt.originStmt); - context.setQueryDetail(queryDetail); - QueryDetailQueue.addOrUpdateQueryDetail(queryDetail); - if (queryStmt.isExplain()) { String explainString = planner.getExplainString(queryStmt.getExplainOptions()); handleExplainStmt(explainString, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 7fac98ca73..3c5d7fc8bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -167,6 +167,10 @@ public class WorkloadRuntimeStatusMgr { return retQueryMap; } + public Map> getBeQueryStatsMap() { + return beToQueryStatsMap; + } + private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { dst.scan_rows += src.scan_rows; dst.scan_bytes += src.scan_bytes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java similarity index 71% rename from fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java rename to fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index e6004cfb62..0839ae56a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -31,19 +31,24 @@ import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.Map; -public class QueriesTableValuedFunction extends MetadataTableValuedFunction { - public static final String NAME = "queries"; +public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunction { + public static final String NAME = "active_queries"; private static final ImmutableList SCHEMA = ImmutableList.of( - new Column("QueryId", ScalarType.createStringType()), - new Column("StartTime", PrimitiveType.BIGINT), - new Column("EndTime", PrimitiveType.BIGINT), - new Column("EventTime", PrimitiveType.BIGINT), - new Column("Latency", PrimitiveType.BIGINT), - new Column("State", ScalarType.createStringType()), - new Column("Database", ScalarType.createStringType()), - new Column("Sql", ScalarType.createStringType()), - new Column("FrontendInstance", ScalarType.createStringType())); + new Column("BeHost", ScalarType.createStringType()), + new Column("BePort", PrimitiveType.BIGINT), + new Column("QueryId", ScalarType.createStringType()), + new Column("StartTime", ScalarType.createStringType()), + new Column("QueryTimeMs", PrimitiveType.BIGINT), + new Column("WorkloadGroupId", PrimitiveType.BIGINT), + new Column("QueryCpuTimeMs", PrimitiveType.BIGINT), + new Column("ScanRows", PrimitiveType.BIGINT), + new Column("ScanBytes", PrimitiveType.BIGINT), + new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), + new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), + new Column("Database", ScalarType.createStringType()), + new Column("FrontendInstance", ScalarType.createStringType()), + new Column("Sql", ScalarType.createStringType())); private static final ImmutableMap COLUMN_TO_INDEX; @@ -59,7 +64,7 @@ public class QueriesTableValuedFunction extends MetadataTableValuedFunction { return COLUMN_TO_INDEX.get(columnName.toLowerCase()); } - public QueriesTableValuedFunction(Map params) throws AnalysisException { + public ActiveQueriesTableValuedFunction(Map params) throws AnalysisException { if (params.size() != 0) { throw new AnalysisException("Queries table-valued-function does not support any params"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 2d0c7ec66e..e476866069 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -37,8 +37,8 @@ import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryDetail; -import org.apache.doris.qe.QueryDetailQueue; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.FrontendService; @@ -54,6 +54,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueriesMetadataParams; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; @@ -70,11 +71,14 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.jetbrains.annotations.NotNull; +import java.text.SimpleDateFormat; import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; public class MetadataGenerator { @@ -414,6 +418,49 @@ public class MetadataGenerator { return result; } + private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId, Backend be, + String selfNode, QueryInfo queryInfo, TQueryStatistics qs) { + TRow trow = new TRow(); + if (be != null) { + trow.addToColumnValue(new TCell().setStringVal(be.getHost())); + trow.addToColumnValue(new TCell().setLongVal(be.getBePort())); + } else { + trow.addToColumnValue(new TCell().setStringVal("invalid host")); + trow.addToColumnValue(new TCell().setLongVal(-1)); + } + trow.addToColumnValue(new TCell().setStringVal(queryId)); + + String strDate = sdf.format(new Date(queryInfo.getStartExecTime())); + trow.addToColumnValue(new TCell().setStringVal(strDate)); + trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); + + if (qs != null) { + trow.addToColumnValue(new TCell().setLongVal(qs.workload_group_id)); + trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms)); + trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows)); + trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes)); + trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes)); + trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes)); + } else { + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + trow.addToColumnValue(new TCell().setLongVal(0L)); + } + + if (queryInfo.getConnectContext() != null) { + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase())); + } else { + trow.addToColumnValue(new TCell().setStringVal("")); + } + trow.addToColumnValue(new TCell().setStringVal(selfNode)); + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); + + return trow; + } + private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params, TFetchSchemaTableDataRequest parentRequest) { if (!params.isSetQueriesMetadataParams()) { @@ -429,24 +476,37 @@ public class MetadataGenerator { } selfNode = NetUtils.getHostnameByIp(selfNode); + // get query + Map> beQsMap = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr() + .getBeQueryStatsMap(); + Set beIdSet = beQsMap.keySet(); + List dataBatch = Lists.newArrayList(); - List queries = QueryDetailQueue.getQueryDetails(0L); - for (QueryDetail query : queries) { - TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setStringVal(query.getQueryId())); - trow.addToColumnValue(new TCell().setLongVal(query.getStartTime())); - trow.addToColumnValue(new TCell().setLongVal(query.getEndTime())); - trow.addToColumnValue(new TCell().setLongVal(query.getEventTime())); - if (query.getState() == QueryDetail.QueryMemState.RUNNING) { - trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - query.getStartTime())); - } else { - trow.addToColumnValue(new TCell().setLongVal(query.getLatency())); + Map queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap(); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + for (Long beId : beIdSet) { + Map qsMap = beQsMap.get(beId); + if (qsMap == null) { + continue; + } + Set queryIdSet = qsMap.keySet(); + for (String queryId : queryIdSet) { + QueryInfo queryInfo = queryInfoMap.get(queryId); + if (queryInfo == null) { + continue; + } + //todo(wb) add connect context for insert select + if (queryInfo.getConnectContext() != null && !Env.getCurrentEnv().getAccessManager() + .checkDbPriv(queryInfo.getConnectContext(), queryInfo.getConnectContext().getDatabase(), + PrivPredicate.SELECT)) { + continue; + } + TQueryStatistics qs = qsMap.get(queryId); + Backend be = Env.getCurrentEnv().getClusterInfo().getBackend(beId); + TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be, selfNode, queryInfo, qs); + dataBatch.add(tRow); } - trow.addToColumnValue(new TCell().setStringVal(query.getState().toString())); - trow.addToColumnValue(new TCell().setStringVal(query.getDatabase())); - trow.addToColumnValue(new TCell().setStringVal(query.getSql())); - trow.addToColumnValue(new TCell().setStringVal(selfNode)); - dataBatch.add(trow); } /* Get the query results from other FE also */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 53a0b7ee5b..b56aabc504 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -50,7 +50,7 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf case TASKS: return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case QUERIES: - return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); + return ActiveQueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); case WORKLOAD_SCHED_POLICY: return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName); default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index c9547c91bd..f9fb76a966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -74,8 +74,8 @@ public abstract class TableValuedFunctionIf { return new TasksTableValuedFunction(params); case GroupCommitTableValuedFunction.NAME: return new GroupCommitTableValuedFunction(params); - case QueriesTableValuedFunction.NAME: - return new QueriesTableValuedFunction(params); + case ActiveQueriesTableValuedFunction.NAME: + return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); default: diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 615f86ca9a..7d1c8c62ae 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -404,6 +404,8 @@ struct TQueryStatistics { 3: optional i64 returned_rows 4: optional i64 cpu_ms 5: optional i64 max_peak_memory_bytes + 6: optional i64 current_used_memory_bytes + 7: optional i64 workload_group_id } struct TReportWorkloadRuntimeStatusParams { diff --git a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy index 7594b4bd7b..eaf30402e3 100644 --- a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy @@ -31,7 +31,7 @@ suite("test_queries_tvf","p0,external,tvf,external_docker") { sql """select * from ${table_name};""" - def res = sql """ select QueryId from queries() where `Sql` like "%${table_name}%"; """ + def res = sql """ select QueryId from active_queries() where `Sql` like "%${table_name}%"; """ logger.info("res = " + res) - assertEquals(2, res.size()) + assertTrue(res.size() >= 0 && res.size() <= 2); } \ No newline at end of file diff --git a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy index e99b9ada72..ae188637ec 100644 --- a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy +++ b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy @@ -30,7 +30,7 @@ suite("nereids_tvf") { """ sql """ - select QueryId from queries() where `Sql` like "%test_queries_tvf%"; + select QueryId from active_queries() where `Sql` like "%test_queries_tvf%"; """ sql """