diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index bff59130e8..4f4273412c 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -26,6 +26,7 @@ #include #include +#include "exec/schema_scanner/schema_active_queries_scanner.h" #include "exec/schema_scanner/schema_backend_active_tasks.h" #include "exec/schema_scanner/schema_charsets_scanner.h" #include "exec/schema_scanner/schema_collations_scanner.h" @@ -152,6 +153,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaProfilingScanner::create_unique(); case TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS: return SchemaBackendActiveTasksScanner::create_unique(); + case TSchemaTableType::SCH_ACTIVE_QUERIES: + return SchemaActiveQueriesScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp new file mode 100644 index 0000000000..02dcd4d4a3 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_active_queries_scanner.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector SchemaActiveQueriesScanner::_s_tbls_columns = { + // name, type, size + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true}, + {"START_TIME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"QUERY_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true}, + {"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"DATABASE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"FRONTEND_INSTANCE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SQL", TYPE_STRING, sizeof(StringRef), true}}; + +SchemaActiveQueriesScanner::SchemaActiveQueriesScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ACTIVE_QUERIES) {} + +SchemaActiveQueriesScanner::~SchemaActiveQueriesScanner() {} + +Status SchemaActiveQueriesScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + + TQueriesMetadataParams tqueries_meta_params; + tqueries_meta_params.__set_relay_to_other_fe(true); + + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::QUERIES); + metadata_table_params.__set_queries_metadata_params(tqueries_meta_params); + for (int i = 0; i < _s_tbls_columns.size(); i++) { + metadata_table_params.__isset.columns_name = true; + metadata_table_params.columns_name.emplace_back(_s_tbls_columns[i].name); + } + + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::SCHEMA_TABLE); + request.__set_metada_table_params(metadata_table_params); + + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout)); + + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch active queries from FE failed, errmsg=" << status; + return status; + } + std::vector result_data = result.data_batch; + + _active_query_block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _active_query_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + + _active_query_block->reserve(_block_rows_limit); + + if (result_data.size() > 0) { + int col_size = result_data[0].column_value.size(); + if (col_size != _s_tbls_columns.size()) { + return Status::InternalError("active queries schema is not match for FE and BE"); + } + } + + // todo(wb) reuse this callback function + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + int_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + + for (int i = 0; i < result_data.size(); i++) { + TRow row = result_data[i]; + + insert_string_value(0, row.column_value[0].stringVal, _active_query_block.get()); + insert_string_value(1, row.column_value[1].stringVal, _active_query_block.get()); + insert_int_value(2, row.column_value[2].longVal, _active_query_block.get()); + insert_int_value(3, row.column_value[3].longVal, _active_query_block.get()); + insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get()); + insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get()); + insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get()); + } + return Status::OK(); +} + +Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_active_query_block == nullptr) { + RETURN_IF_ERROR(_get_active_queries_block_from_fe()); + _total_rows = _active_query_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.h b/be/src/exec/schema_scanner/schema_active_queries_scanner.h new file mode 100644 index 0000000000..1df5b1f9d7 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaActiveQueriesScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaActiveQueriesScanner); + +public: + SchemaActiveQueriesScanner(); + ~SchemaActiveQueriesScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + Status _get_active_queries_block_from_fe(); + + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _active_query_block = nullptr; + int _rpc_timeout = 3000; +}; +}; // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 7a7d547e24..f3965a2f86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -70,7 +70,8 @@ public enum SchemaTableType { SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS), SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", TSchemaTableType.SCH_METADATA_NAME_IDS), SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING), - SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS); + SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS), + SCH_ACTIVE_QUERIES("ACTIVE_QUERIES", "ACTIVE_QUERIES", TSchemaTableType.SCH_ACTIVE_QUERIES); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index ac1b31fcea..049e1b301e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; @@ -54,7 +53,6 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Hdfs.class, "hdfs"), tableValued(HttpStream.class, "http_stream"), tableValued(Numbers.class, "numbers"), - tableValued(ActiveQueries.class, "active_queries"), tableValued(S3.class, "s3"), tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 9e06841861..2ce7f5bb79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -458,6 +458,15 @@ public class SchemaTable extends Table { .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("SHUFFLE_SEND_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) .build())) + .put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA, + builder().column("QUERY_ID", ScalarType.createVarchar(256)) + .column("START_TIME", ScalarType.createVarchar(256)) + .column("QUERY_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("DATABASE", ScalarType.createVarchar(256)) + .column("FRONTEND_INSTANCE", ScalarType.createVarchar(256)) + .column("SQL", ScalarType.createStringType()) + .build())) .build(); protected SchemaTable(long id, String name, TableType type, List baseSchema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java deleted file mode 100644 index 1e15b67424..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.expressions.functions.table; - -import org.apache.doris.catalog.FunctionSignature; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.trees.expressions.Properties; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; -import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction; -import org.apache.doris.tablefunction.TableValuedFunctionIf; - -import java.util.Map; - -/** - * queries tvf - */ -public class ActiveQueries extends TableValuedFunction { - - public ActiveQueries(Properties properties) { - super("active_queries", properties); - } - - @Override - public FunctionSignature customSignature() { - return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); - } - - @Override - protected TableValuedFunctionIf toCatalogFunction() { - try { - Map arguments = getTVFProperties().getMap(); - return new ActiveQueriesTableValuedFunction(arguments); - } catch (Throwable t) { - throw new AnalysisException("Can not build ActiveQueriesTableValuedFunction by " - + this + ": " + t.getMessage(), t); - } - } - - @Override - public R accept(ExpressionVisitor visitor, C context) { - return visitor.visitQueries(this, context); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index fba34d4816..36e8ac365f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.trees.expressions.visitor; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; @@ -91,10 +90,6 @@ public interface TableValuedFunctionVisitor { return visitTableValuedFunction(numbers, context); } - default R visitQueries(ActiveQueries queries, C context) { - return visitTableValuedFunction(queries, context); - } - default R visitS3(S3 s3, C context) { return visitTableValuedFunction(s3, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7f2b0db8c0..f4b4b74f93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2292,6 +2292,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { switch (request.getSchemaTableName()) { case METADATA_TABLE: return MetadataGenerator.getMetadataTable(request); + case SCHEMA_TABLE: + return MetadataGenerator.getSchemaTableData(request); default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 8a3df743e2..87aabe5bc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -18,8 +18,11 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; @@ -63,6 +66,8 @@ import org.apache.doris.thrift.TUserIdentity; import com.google.common.base.Stopwatch; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.Gson; import org.apache.iceberg.Snapshot; @@ -84,6 +89,25 @@ import java.util.concurrent.TimeUnit; public class MetadataGenerator { private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class); + private static final ImmutableList ACTIVE_QUERIES_SCHEMA = ImmutableList.of( + new Column("QUERY_ID", ScalarType.createStringType()), + new Column("START_TIME", ScalarType.createStringType()), + new Column("QUERY_TIME_MS", PrimitiveType.BIGINT), + new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT), + new Column("DATABASE", ScalarType.createStringType()), + new Column("FRONTEND_INSTANCE", ScalarType.createStringType()), + new Column("SQL", ScalarType.createStringType())); + + private static final ImmutableMap ACTIVE_QUERIES_COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < ACTIVE_QUERIES_SCHEMA.size(); i++) { + builder.put(ACTIVE_QUERIES_SCHEMA.get(i).getName().toLowerCase(), i); + } + ACTIVE_QUERIES_COLUMN_TO_INDEX = builder.build(); + } + public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { if (!request.isSetMetadaTableParams() || !request.getMetadaTableParams().isSetMetadataType()) { return errorResult("Metadata table params is not set. "); @@ -118,9 +142,6 @@ public class MetadataGenerator { case TASKS: result = taskMetadataResult(params); break; - case QUERIES: - result = queriesMetadataResult(params, request); - break; case WORKLOAD_SCHED_POLICY: result = workloadSchedPolicyMetadataResult(params); break; @@ -133,6 +154,29 @@ public class MetadataGenerator { return result; } + public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDataRequest request) + throws TException { + if (!request.isSetMetadaTableParams() || !request.getMetadaTableParams().isSetMetadataType()) { + return errorResult("Metadata table params is not set. "); + } + TFetchSchemaTableDataResult result; + TMetadataTableRequestParams params = request.getMetadaTableParams(); + ImmutableMap columnIndex; + // todo(wb) move workload group/workload scheduler policy here + switch (request.getMetadaTableParams().getMetadataType()) { + case QUERIES: + result = queriesMetadataResult(params, request); + columnIndex = ACTIVE_QUERIES_COLUMN_TO_INDEX; + break; + default: + return errorResult("schema table params is not set."); + } + if (result.getStatus().getStatusCode() == TStatusCode.OK) { + filterColumns(result, params.getColumnsName(), columnIndex); + } + return result; + } + @NotNull public static TFetchSchemaTableDataResult errorResult(String msg) { TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); @@ -594,6 +638,27 @@ public class MetadataGenerator { result.setDataBatch(filterColumnsRows); } + private static void filterColumns(TFetchSchemaTableDataResult result, + List columnNames, + ImmutableMap columnIndex) throws TException { + List fullColumnsRow = result.getDataBatch(); + List filterColumnsRows = Lists.newArrayList(); + for (TRow row : fullColumnsRow) { + TRow filterRow = new TRow(); + try { + for (String columnName : columnNames) { + Integer index = columnIndex.get(columnName.toLowerCase()); + filterRow.addToColumnValue(row.getColumnValue().get(index)); + } + } catch (Throwable e) { + LOG.info("error happens when filter columns.", e); + throw new TException(e); + } + filterColumnsRows.add(filterRow); + } + result.setDataBatch(filterColumnsRows); + } + private static long convertToDateTimeV2( int year, int month, int day, int hour, int minute, int second, int microsecond) { return (long) microsecond | (long) second << 20 | (long) minute << 26 | (long) hour << 32 diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index e5e478d039..1947c241ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf { + public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName, TMetadataTableRequestParams params) throws AnalysisException { @@ -49,8 +50,6 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case TASKS: return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); - case QUERIES: - return ActiveQueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); case WORKLOAD_SCHED_POLICY: return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName); default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index f9fb76a966..2c63cdca1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -74,8 +74,6 @@ public abstract class TableValuedFunctionIf { return new TasksTableValuedFunction(params); case GroupCommitTableValuedFunction.NAME: return new GroupCommitTableValuedFunction(params); - case ActiveQueriesTableValuedFunction.NAME: - return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); default: diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 5b4c15e876..4e327644bd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -87,7 +87,7 @@ public class RefreshCatalogTest extends TestWithFeService { List dbNames2 = test1.getDbNames(); Assertions.assertEquals(4, dbNames2.size()); ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(28, infoDb.getTables().size()); + Assertions.assertEquals(29, infoDb.getTables().size()); TestExternalDatabase testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); @@ -96,7 +96,7 @@ public class RefreshCatalogTest extends TestWithFeService { CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class); test1 = mgr2.getCatalog("test1"); infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(28, infoDb.getTables().size()); + Assertions.assertEquals(29, infoDb.getTables().size()); testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index dd55e8e6f6..760fd979bc 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -126,7 +126,8 @@ enum TSchemaTableType { SCH_PARAMETERS, SCH_METADATA_NAME_IDS, SCH_PROFILING, - SCH_BACKEND_ACTIVE_TASKS; + SCH_BACKEND_ACTIVE_TASKS, + SCH_ACTIVE_QUERIES; } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 17bc432d80..79be4de149 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -900,7 +900,8 @@ struct TInitExternalCtlMetaResult { enum TSchemaTableName { // BACKENDS = 0, - METADATA_TABLE = 1, + METADATA_TABLE = 1, // tvf + SCHEMA_TABLE = 2, // db information_schema's table } struct TMetadataTableRequestParams { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 8a92a98dab..205b22414e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -494,7 +494,7 @@ struct TTasksMetadataParams { struct TQueriesMetadataParams { 1: optional string cluster_name - 2: optional bool relay_to_other_fe + 2: optional bool relay_to_other_fe 3: optional TMaterializedViewsMetadataParams materialized_views_params 4: optional TJobsMetadataParams jobs_params 5: optional TTasksMetadataParams tasks_params diff --git a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out index ff09873c18..8023cf076c 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out @@ -23,6 +23,7 @@ mariadb_jdbc_catalog 115 abg -- !information_schema -- +active_queries backend_active_tasks character_sets collations diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 309c753f5d..f03a7591cf 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -192,6 +192,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 2 2 -- !information_schema -- +active_queries backend_active_tasks character_sets collations diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out index fcbf4f9924..8cf9d01b6e 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out @@ -160,6 +160,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 123456789012345678901234567890123.12345 12345678901234567890123456789012.12345 1234567890123456789012345678901234.12345 123456789012345678901234567890123.12345 123456789012345678901234567890123456789012345678901234567890.12345 123456789012345678901234567890123456789012345678901234567890.12345 -- !information_schema -- +active_queries backend_active_tasks character_sets collations diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out index 85b5ed1393..5e63f90319 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out @@ -192,6 +192,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 2 2 -- !information_schema -- +active_queries backend_active_tasks character_sets collations diff --git a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy index eaf30402e3..701db9afd5 100644 --- a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy @@ -31,7 +31,7 @@ suite("test_queries_tvf","p0,external,tvf,external_docker") { sql """select * from ${table_name};""" - def res = sql """ select QueryId from active_queries() where `Sql` like "%${table_name}%"; """ + def res = sql """ select query_id from information_schema.active_queries where `sql` like "%${table_name}%"; """ logger.info("res = " + res) assertTrue(res.size() >= 0 && res.size() <= 2); } \ No newline at end of file diff --git a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy index ae188637ec..d1208122ed 100644 --- a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy +++ b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy @@ -30,7 +30,7 @@ suite("nereids_tvf") { """ sql """ - select QueryId from active_queries() where `Sql` like "%test_queries_tvf%"; + select query_id from information_schema.active_queries where `sql` like "%test_queries_tvf%"; """ sql """ diff --git a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy new file mode 100644 index 0000000000..eecba3d063 --- /dev/null +++ b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_active_queries") { + def thread1 = new Thread({ + while(true) { + // non-pipeline + sql "set experimental_enable_pipeline_engine=false" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.active_queries" + sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + + // pipeline + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.active_queries" + sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + + // pipelinex + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=true" + sql "select * from information_schema.active_queries" + sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + Thread.sleep(1000) + } + }) + thread1.setDaemon(true) + thread1.start() +} + diff --git a/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy b/regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy similarity index 100% rename from regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy rename to regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy