From c5390d00bba4e6312980d420850c9d5c4e92a612 Mon Sep 17 00:00:00 2001 From: wangbo Date: Sat, 9 Mar 2024 10:19:07 +0800 Subject: [PATCH] [Improvement]Add schema table backend_active_tasks (#31945) --- be/src/exec/schema_scanner.cpp | 3 + .../schema_backend_active_tasks.cpp | 94 +++++++++++++++++++ .../schema_backend_active_tasks.h | 49 ++++++++++ .../runtime/runtime_query_statistics_mgr.cpp | 79 ++++++++-------- be/src/runtime/runtime_query_statistics_mgr.h | 8 +- be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +---- be/src/vec/exec/scan/vmeta_scanner.h | 1 - .../doris/analysis/SchemaTableType.java | 3 +- .../catalog/BuiltinTableValuedFunctions.java | 4 +- .../org/apache/doris/catalog/SchemaTable.java | 14 +++ .../functions/table/ActiveBeTasks.java | 58 ------------ .../visitor/TableValuedFunctionVisitor.java | 5 - .../BackendPartitionedSchemaScanNode.java | 12 ++- .../ActiveBeTasksTableValuedFunction.java | 76 --------------- .../tablefunction/TableValuedFunctionIf.java | 2 - .../doris/datasource/RefreshCatalogTest.java | 4 +- gensrc/thrift/Descriptors.thrift | 3 +- gensrc/thrift/Types.thrift | 3 +- .../jdbc/test_mariadb_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog_nereids.out | 1 + .../test_backend_active_tasks.groovy | 43 +++++++++ 22 files changed, 270 insertions(+), 216 deletions(-) create mode 100644 be/src/exec/schema_scanner/schema_backend_active_tasks.cpp create mode 100644 be/src/exec/schema_scanner/schema_backend_active_tasks.h delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java create mode 100644 regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index b700d36f20..bff59130e8 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -26,6 +26,7 @@ #include #include +#include "exec/schema_scanner/schema_backend_active_tasks.h" #include "exec/schema_scanner/schema_charsets_scanner.h" #include "exec/schema_scanner/schema_collations_scanner.h" #include "exec/schema_scanner/schema_columns_scanner.h" @@ -149,6 +150,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaMetadataNameIdsScanner::create_unique(); case TSchemaTableType::SCH_PROFILING: return SchemaProfilingScanner::create_unique(); + case TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS: + return SchemaBackendActiveTasksScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp new file mode 100644 index 0000000000..c5f8825c2e --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_backend_active_tasks.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector SchemaBackendActiveTasksScanner::_s_tbls_columns = { + // name, type, size + {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false}, + {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, + {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, + {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, + {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, + {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, +}; + +SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS) {} + +SchemaBackendActiveTasksScanner::~SchemaBackendActiveTasksScanner() {} + +Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + return Status::OK(); +} + +Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_task_stats_block == nullptr) { + _task_stats_block = vectorized::Block::create_unique(); + + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _task_stats_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + + _task_stats_block->reserve(_block_rows_limit); + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block( + _task_stats_block.get()); + _total_rows = _task_stats_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h b/be/src/exec/schema_scanner/schema_backend_active_tasks.h new file mode 100644 index 0000000000..d8a2a1ffa3 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaBackendActiveTasksScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaBackendActiveTasksScanner); + +public: + SchemaBackendActiveTasksScanner(); + ~SchemaBackendActiveTasksScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _task_stats_block = nullptr; +}; +}; // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index ee09b0c30d..9764b0f050 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -21,6 +21,7 @@ #include "runtime/exec_env.h" #include "util/debug_util.h" #include "util/time.h" +#include "vec/core/block.h" namespace doris { @@ -199,54 +200,52 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64 } } -std::vector RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics( - std::vector filter_columns) { +void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* block) { std::shared_lock read_lock(_qs_ctx_map_lock); - std::vector table_rows; int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; - for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { - TRow trow; + auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + int_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + + // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns + for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); + insert_int_value(0, be_id, block); + insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); + insert_string_value(2, query_id, block); - for (auto iter = filter_columns.begin(); iter != filter_columns.end(); iter++) { - std::string col_name = *iter; - - TCell tcell; - if (col_name == "beid") { - tcell.longVal = be_id; - } else if (col_name == "fehost") { - tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname; - } else if (col_name == "queryid") { - tcell.stringVal = query_id; - } else if (col_name == "tasktimems") { - if (qs_ctx_ptr->_is_query_finished) { - tcell.longVal = qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time; - } else { - tcell.longVal = MonotonicMillis() - qs_ctx_ptr->_query_start_time; - } - } else if (col_name == "taskcputimems") { - tcell.longVal = tqs.cpu_ms; - } else if (col_name == "scanrows") { - tcell.longVal = tqs.scan_rows; - } else if (col_name == "scanbytes") { - tcell.longVal = tqs.scan_bytes; - } else if (col_name == "bepeakmemorybytes") { - tcell.longVal = tqs.max_peak_memory_bytes; - } else if (col_name == "currentusedmemorybytes") { - tcell.longVal = tqs.current_used_memory_bytes; - } else if (col_name == "shufflesendbytes") { - tcell.longVal = tqs.shuffle_send_bytes; - } else if (col_name == "shufflesendRows") { - tcell.longVal = tqs.shuffle_send_rows; - } - trow.column_value.push_back(tcell); - } - table_rows.push_back(trow); + int64_t task_time = qs_ctx_ptr->_is_query_finished + ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time + : MonotonicMillis() - qs_ctx_ptr->_query_start_time; + insert_int_value(3, task_time, block); + insert_int_value(4, tqs.cpu_ms, block); + insert_int_value(5, tqs.scan_rows, block); + insert_int_value(6, tqs.scan_bytes, block); + insert_int_value(7, tqs.max_peak_memory_bytes, block); + insert_int_value(8, tqs.current_used_memory_bytes, block); + insert_int_value(9, tqs.shuffle_send_bytes, block); + insert_int_value(10, tqs.shuffle_send_rows, block); } - return table_rows; } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 44badd196a..1b3e164d48 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -28,6 +28,10 @@ namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + class QueryStatisticsCtx { public: QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { @@ -68,8 +72,8 @@ public: void get_metric_map(std::string query_id, std::map& metric_map); - // used for tvf active_queries - std::vector get_active_be_tasks_statistics(std::vector filter_columns); + // used for backend_active_tasks + void get_active_be_tasks_block(vectorized::Block* block); private: std::shared_mutex _qs_ctx_map_lock; diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 7d438366cf..22545fa4dc 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -34,7 +34,6 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/thrift_rpc_helper.h" @@ -96,12 +95,7 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable"); } - if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ACTIVE_BE_TASKS) { - // tvf active_be_tasks fetch data in be directly, it does not need to request FE for data - RETURN_IF_ERROR(_build_active_be_tasks_data()); - } else { - RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); - } + RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); return Status::OK(); } @@ -294,20 +288,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } -Status VMetaScanner::_build_active_be_tasks_data() { - std::vector filter_columns; - for (const auto& slot : _tuple_desc->slots()) { - filter_columns.emplace_back(slot->col_name_lower_case()); - } - - std::vector ret = - ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics( - filter_columns); - _batch_data = std::move(ret); - - return Status::OK(); -} - Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 25cb934531..518f42ffc1 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -91,7 +91,6 @@ private: TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); - Status _build_active_be_tasks_data(); bool _meta_eos; TupleId _tuple_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 59bae154d5..7a7d547e24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -69,7 +69,8 @@ public enum SchemaTableType { SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS), SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS), SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", TSchemaTableType.SCH_METADATA_NAME_IDS), - SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING); + SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING), + SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index acdeb683f2..ac1b31fcea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -60,8 +59,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), - tableValued(WorkloadGroups.class, "workload_groups"), - tableValued(ActiveBeTasks.class, "active_be_tasks") + tableValued(WorkloadGroups.class, "workload_groups") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 7215cf0fc7..9e06841861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -444,6 +444,20 @@ public class SchemaTable extends Table { .column("SOURCE_FILE", ScalarType.createVarchar(20)) .column("SOURCE_LINE", ScalarType.createType(PrimitiveType.INT)) .build())) + .put("backend_active_tasks", + new SchemaTable(SystemIdGenerator.getNextId(), "backend_active_tasks", TableType.SCHEMA, + builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("FE_HOST", ScalarType.createVarchar(256)) + .column("QUERY_ID", ScalarType.createVarchar(256)) + .column("TASK_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TASK_CPU_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCAN_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BE_PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SHUFFLE_SEND_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .build())) .build(); protected SchemaTable(long id, String name, TableType type, List baseSchema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java deleted file mode 100644 index 5737f52a2b..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.expressions.functions.table; - -import org.apache.doris.catalog.FunctionSignature; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.trees.expressions.Properties; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; -import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction; -import org.apache.doris.tablefunction.TableValuedFunctionIf; - -import java.util.Map; - -/** - * stands be running tasks status, currently main including select/streamload/broker load/insert select - */ -public class ActiveBeTasks extends TableValuedFunction { - - public ActiveBeTasks(Properties properties) { - super("active_be_tasks", properties); - } - - @Override - public FunctionSignature customSignature() { - return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); - } - - @Override - protected TableValuedFunctionIf toCatalogFunction() { - try { - Map arguments = getTVFProperties().getMap(); - return new ActiveBeTasksTableValuedFunction(arguments); - } catch (Throwable t) { - throw new AnalysisException("Can not build ActiveBeTasksTableValuedFunction by " - + this + ": " + t.getMessage(), t); - } - } - - public R accept(ExpressionVisitor visitor, C context) { - return visitor.visitActiveBeTasks(this, context); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 36561e5b12..fba34d4816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.trees.expressions.visitor; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -103,8 +102,4 @@ public interface TableValuedFunctionVisitor { default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { return visitTableValuedFunction(workloadGroups, context); } - - default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) { - return visitTableValuedFunction(beTasks, context); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 592fc3c96c..dc57b67d98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -41,8 +41,10 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * The BackendSchemaScanNode used for those SchemaTable which data are need to acquire from backends. @@ -51,10 +53,16 @@ import java.util.Map; * So, we can use partitionInfo to select the necessary `be` to send query. */ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { - public static final String ROWSETS = "rowsets"; + + public static final Set BACKEND_TABLE = new HashSet<>(); + + static { + BACKEND_TABLE.add("rowsets"); + BACKEND_TABLE.add("backend_active_tasks"); + } public static boolean isBackendPartitionedSchemaTable(String tableName) { - if (tableName.equalsIgnoreCase(ROWSETS)) { + if (BACKEND_TABLE.contains(tableName.toLowerCase())) { return true; } return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java deleted file mode 100644 index 99a8ba4886..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.tablefunction; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.thrift.TMetaScanRange; -import org.apache.doris.thrift.TMetadataType; - -import com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.Map; - -public class ActiveBeTasksTableValuedFunction extends MetadataTableValuedFunction { - - public static final String NAME = "active_be_tasks"; - - private static final ImmutableList SCHEMA = ImmutableList.of( - new Column("BeId", PrimitiveType.BIGINT), - new Column("FeHost", ScalarType.createStringType()), - new Column("QueryId", ScalarType.createStringType()), - new Column("TaskTimeMs", PrimitiveType.BIGINT), - new Column("TaskCpuTimeMs", PrimitiveType.BIGINT), - new Column("ScanRows", PrimitiveType.BIGINT), - new Column("ScanBytes", PrimitiveType.BIGINT), - new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), - new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendRows", PrimitiveType.BIGINT)); - - public ActiveBeTasksTableValuedFunction(Map params) throws AnalysisException { - if (params.size() != 0) { - throw new AnalysisException("ActiveBeTasks table-valued-function does not support any params"); - } - } - - @Override - public TMetadataType getMetadataType() { - return TMetadataType.ACTIVE_BE_TASKS; - } - - @Override - public TMetaScanRange getMetaScanRange() { - TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS); - return metaScanRange; - } - - @Override - public String getTableName() { - return "ActiveBeTasksTableValuedFunction"; - } - - @Override - public List getTableColumns() throws AnalysisException { - return SCHEMA; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 4b755a97bf..f9fb76a966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -78,8 +78,6 @@ public abstract class TableValuedFunctionIf { return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); - case ActiveBeTasksTableValuedFunction.NAME: - return new ActiveBeTasksTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index f5244005c1..5b4c15e876 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -87,7 +87,7 @@ public class RefreshCatalogTest extends TestWithFeService { List dbNames2 = test1.getDbNames(); Assertions.assertEquals(4, dbNames2.size()); ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(27, infoDb.getTables().size()); + Assertions.assertEquals(28, infoDb.getTables().size()); TestExternalDatabase testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); @@ -96,7 +96,7 @@ public class RefreshCatalogTest extends TestWithFeService { CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class); test1 = mgr2.getCatalog("test1"); infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(27, infoDb.getTables().size()); + Assertions.assertEquals(28, infoDb.getTables().size()); testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index cde4cb043a..dd55e8e6f6 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -125,7 +125,8 @@ enum TSchemaTableType { SCH_COLUMN_STATISTICS, SCH_PARAMETERS, SCH_METADATA_NAME_IDS, - SCH_PROFILING; + SCH_PROFILING, + SCH_BACKEND_ACTIVE_TASKS; } enum THdfsCompression { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index a1e414384d..11ec1093da 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -711,8 +711,7 @@ enum TMetadataType { JOBS, TASKS, QUERIES, - WORKLOAD_SCHED_POLICY, - ACTIVE_BE_TASKS, + WORKLOAD_SCHED_POLICY } enum TIcebergQueryType { diff --git a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out index 0ad76d0db4..ff09873c18 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out @@ -23,6 +23,7 @@ mariadb_jdbc_catalog 115 abg -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 3520a11d8b..309c753f5d 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -192,6 +192,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 2 2 -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out index a66c418fed..fcbf4f9924 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out @@ -160,6 +160,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 123456789012345678901234567890123.12345 12345678901234567890123456789012.12345 1234567890123456789012345678901234.12345 123456789012345678901234567890123.12345 123456789012345678901234567890123456789012345678901234567890.12345 123456789012345678901234567890123456789012345678901234567890.12345 -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy new file mode 100644 index 0000000000..172a7bbf85 --- /dev/null +++ b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backend_active_tasks") { + def thread1 = new Thread({ + while(true) { + // non-pipeline + sql "set experimental_enable_pipeline_engine=false" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + + // pipeline + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + + // pipelinex + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=true" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + Thread.sleep(1000) + } + }) + thread1.setDaemon(true) + thread1.start() +}