From 28f0b7eb321ec6839808d252bf0d04775df18e1a Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 6 Mar 2024 13:53:57 +0800 Subject: [PATCH] [Improvement](profile)Add tvf active_be_tasks() #31815 --- .../runtime/runtime_query_statistics_mgr.cpp | 50 ++++++++++++ be/src/runtime/runtime_query_statistics_mgr.h | 5 ++ be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +++++- be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../catalog/BuiltinTableValuedFunctions.java | 4 +- .../functions/table/ActiveBeTasks.java | 58 ++++++++++++++ .../functions/table/ActiveQueries.java | 2 +- .../visitor/TableValuedFunctionVisitor.java | 5 ++ .../ActiveBeTasksTableValuedFunction.java | 76 +++++++++++++++++++ .../ActiveQueriesTableValuedFunction.java | 4 +- .../tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/Types.thrift | 3 +- 12 files changed, 227 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index cb137fbe82..ee09b0c30d 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -199,4 +199,54 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64 } } +std::vector RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics( + std::vector filter_columns) { + std::shared_lock read_lock(_qs_ctx_map_lock); + std::vector table_rows; + int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + + for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + TRow trow; + + TQueryStatistics tqs; + qs_ctx_ptr->collect_query_statistics(&tqs); + + for (auto iter = filter_columns.begin(); iter != filter_columns.end(); iter++) { + std::string col_name = *iter; + + TCell tcell; + if (col_name == "beid") { + tcell.longVal = be_id; + } else if (col_name == "fehost") { + tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname; + } else if (col_name == "queryid") { + tcell.stringVal = query_id; + } else if (col_name == "tasktimems") { + if (qs_ctx_ptr->_is_query_finished) { + tcell.longVal = qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time; + } else { + tcell.longVal = MonotonicMillis() - qs_ctx_ptr->_query_start_time; + } + } else if (col_name == "taskcputimems") { + tcell.longVal = tqs.cpu_ms; + } else if (col_name == "scanrows") { + tcell.longVal = tqs.scan_rows; + } else if (col_name == "scanbytes") { + tcell.longVal = tqs.scan_bytes; + } else if (col_name == "bepeakmemorybytes") { + tcell.longVal = tqs.max_peak_memory_bytes; + } else if (col_name == "currentusedmemorybytes") { + tcell.longVal = tqs.current_used_memory_bytes; + } else if (col_name == "shufflesendbytes") { + tcell.longVal = tqs.shuffle_send_bytes; + } else if (col_name == "shufflesendRows") { + tcell.longVal = tqs.shuffle_send_rows; + } + trow.column_value.push_back(tcell); + } + table_rows.push_back(trow); + } + return table_rows; +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 69b283b6d1..44badd196a 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include @@ -66,6 +68,9 @@ public: void get_metric_map(std::string query_id, std::map& metric_map); + // used for tvf active_queries + std::vector get_active_be_tasks_statistics(std::vector filter_columns); + private: std::shared_mutex _qs_ctx_map_lock; std::map> _query_statistics_ctx_map; diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 22545fa4dc..7d438366cf 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -34,6 +34,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/thrift_rpc_helper.h" @@ -95,7 +96,12 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable"); } - RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); + if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ACTIVE_BE_TASKS) { + // tvf active_be_tasks fetch data in be directly, it does not need to request FE for data + RETURN_IF_ERROR(_build_active_be_tasks_data()); + } else { + RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); + } return Status::OK(); } @@ -288,6 +294,20 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } +Status VMetaScanner::_build_active_be_tasks_data() { + std::vector filter_columns; + for (const auto& slot : _tuple_desc->slots()) { + filter_columns.emplace_back(slot->col_name_lower_case()); + } + + std::vector ret = + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics( + filter_columns); + _batch_data = std::move(ret); + + return Status::OK(); +} + Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 59bd55dc2d..25cb934531 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -91,6 +91,8 @@ private: TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_active_be_tasks_data(); + bool _meta_eos; TupleId _tuple_id; TUserIdentity _user_identity; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index ac1b31fcea..acdeb683f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -59,7 +60,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), - tableValued(WorkloadGroups.class, "workload_groups") + tableValued(WorkloadGroups.class, "workload_groups"), + tableValued(ActiveBeTasks.class, "active_be_tasks") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java new file mode 100644 index 0000000000..5737f52a2b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * stands be running tasks status, currently main including select/streamload/broker load/insert select + */ +public class ActiveBeTasks extends TableValuedFunction { + + public ActiveBeTasks(Properties properties) { + super("active_be_tasks", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new ActiveBeTasksTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build ActiveBeTasksTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitActiveBeTasks(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java index f8dcaa4a7e..1e15b67424 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java @@ -47,7 +47,7 @@ public class ActiveQueries extends TableValuedFunction { Map arguments = getTVFProperties().getMap(); return new ActiveQueriesTableValuedFunction(arguments); } catch (Throwable t) { - throw new AnalysisException("Can not build FrontendsTableValuedFunction by " + throw new AnalysisException("Can not build ActiveQueriesTableValuedFunction by " + this + ": " + t.getMessage(), t); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index fba34d4816..36561e5b12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.expressions.visitor; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -102,4 +103,8 @@ public interface TableValuedFunctionVisitor { default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { return visitTableValuedFunction(workloadGroups, context); } + + default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) { + return visitTableValuedFunction(beTasks, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java new file mode 100644 index 0000000000..99a8ba4886 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; + +public class ActiveBeTasksTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "active_be_tasks"; + + private static final ImmutableList SCHEMA = ImmutableList.of( + new Column("BeId", PrimitiveType.BIGINT), + new Column("FeHost", ScalarType.createStringType()), + new Column("QueryId", ScalarType.createStringType()), + new Column("TaskTimeMs", PrimitiveType.BIGINT), + new Column("TaskCpuTimeMs", PrimitiveType.BIGINT), + new Column("ScanRows", PrimitiveType.BIGINT), + new Column("ScanBytes", PrimitiveType.BIGINT), + new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), + new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), + new Column("ShuffleSendBytes", PrimitiveType.BIGINT), + new Column("ShuffleSendRows", PrimitiveType.BIGINT)); + + public ActiveBeTasksTableValuedFunction(Map params) throws AnalysisException { + if (params.size() != 0) { + throw new AnalysisException("ActiveBeTasks table-valued-function does not support any params"); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.ACTIVE_BE_TASKS; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS); + return metaScanRange; + } + + @Override + public String getTableName() { + return "ActiveBeTasksTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + return SCHEMA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index 27f65ed768..41dd5484dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -68,7 +68,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio public ActiveQueriesTableValuedFunction(Map params) throws AnalysisException { if (params.size() != 0) { - throw new AnalysisException("Queries table-valued-function does not support any params"); + throw new AnalysisException("ActiveQueries table-valued-function does not support any params"); } } @@ -90,7 +90,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio @Override public String getTableName() { - return "QueriesTableValuedFunction"; + return "ActiveQueriesTableValuedFunction"; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index f9fb76a966..4b755a97bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -78,6 +78,8 @@ public abstract class TableValuedFunctionIf { return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); + case ActiveBeTasksTableValuedFunction.NAME: + return new ActiveBeTasksTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 11ec1093da..a1e414384d 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -711,7 +711,8 @@ enum TMetadataType { JOBS, TASKS, QUERIES, - WORKLOAD_SCHED_POLICY + WORKLOAD_SCHED_POLICY, + ACTIVE_BE_TASKS, } enum TIcebergQueryType {