[Refactor](exectuor)Add schema type table active_queries (#32057)

* Add schema type table active_queries
This commit is contained in:
wangbo
2024-03-13 10:55:41 +08:00
committed by yiguolei
parent 58675c271b
commit df5ec16d7c
24 changed files with 359 additions and 81 deletions

View File

@ -70,7 +70,8 @@ public enum SchemaTableType {
SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS),
SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", TSchemaTableType.SCH_METADATA_NAME_IDS),
SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING),
SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS);
SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS),
SCH_ACTIVE_QUERIES("ACTIVE_QUERIES", "ACTIVE_QUERIES", TSchemaTableType.SCH_ACTIVE_QUERIES);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;

View File

@ -17,7 +17,6 @@
package org.apache.doris.catalog;
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
@ -54,7 +53,6 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(Hdfs.class, "hdfs"),
tableValued(HttpStream.class, "http_stream"),
tableValued(Numbers.class, "numbers"),
tableValued(ActiveQueries.class, "active_queries"),
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),

View File

@ -458,6 +458,15 @@ public class SchemaTable extends Table {
.column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("SHUFFLE_SEND_ROWS", ScalarType.createType(PrimitiveType.BIGINT))
.build()))
.put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createVarchar(256))
.column("START_TIME", ScalarType.createVarchar(256))
.column("QUERY_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATABASE", ScalarType.createVarchar(256))
.column("FRONTEND_INSTANCE", ScalarType.createVarchar(256))
.column("SQL", ScalarType.createStringType())
.build()))
.build();
protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) {

View File

@ -1,59 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/**
* queries tvf
*/
public class ActiveQueries extends TableValuedFunction {
public ActiveQueries(Properties properties) {
super("active_queries", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new ActiveQueriesTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build ActiveQueriesTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitQueries(this, context);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.doris.nereids.trees.expressions.visitor;
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
@ -91,10 +90,6 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(numbers, context);
}
default R visitQueries(ActiveQueries queries, C context) {
return visitTableValuedFunction(queries, context);
}
default R visitS3(S3 s3, C context) {
return visitTableValuedFunction(s3, context);
}

View File

@ -2292,6 +2292,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
switch (request.getSchemaTableName()) {
case METADATA_TABLE:
return MetadataGenerator.getMetadataTable(request);
case SCHEMA_TABLE:
return MetadataGenerator.getSchemaTableData(request);
default:
break;
}

View File

@ -18,8 +18,11 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
@ -63,6 +66,8 @@ import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.iceberg.Snapshot;
@ -84,6 +89,25 @@ import java.util.concurrent.TimeUnit;
public class MetadataGenerator {
private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class);
private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = ImmutableList.of(
new Column("QUERY_ID", ScalarType.createStringType()),
new Column("START_TIME", ScalarType.createStringType()),
new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
new Column("DATABASE", ScalarType.createStringType()),
new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
new Column("SQL", ScalarType.createStringType()));
private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < ACTIVE_QUERIES_SCHEMA.size(); i++) {
builder.put(ACTIVE_QUERIES_SCHEMA.get(i).getName().toLowerCase(), i);
}
ACTIVE_QUERIES_COLUMN_TO_INDEX = builder.build();
}
public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
if (!request.isSetMetadaTableParams() || !request.getMetadaTableParams().isSetMetadataType()) {
return errorResult("Metadata table params is not set. ");
@ -118,9 +142,6 @@ public class MetadataGenerator {
case TASKS:
result = taskMetadataResult(params);
break;
case QUERIES:
result = queriesMetadataResult(params, request);
break;
case WORKLOAD_SCHED_POLICY:
result = workloadSchedPolicyMetadataResult(params);
break;
@ -133,6 +154,29 @@ public class MetadataGenerator {
return result;
}
public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDataRequest request)
throws TException {
if (!request.isSetMetadaTableParams() || !request.getMetadaTableParams().isSetMetadataType()) {
return errorResult("Metadata table params is not set. ");
}
TFetchSchemaTableDataResult result;
TMetadataTableRequestParams params = request.getMetadaTableParams();
ImmutableMap<String, Integer> columnIndex;
// todo(wb) move workload group/workload scheduler policy here
switch (request.getMetadaTableParams().getMetadataType()) {
case QUERIES:
result = queriesMetadataResult(params, request);
columnIndex = ACTIVE_QUERIES_COLUMN_TO_INDEX;
break;
default:
return errorResult("schema table params is not set.");
}
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, params.getColumnsName(), columnIndex);
}
return result;
}
@NotNull
public static TFetchSchemaTableDataResult errorResult(String msg) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
@ -594,6 +638,27 @@ public class MetadataGenerator {
result.setDataBatch(filterColumnsRows);
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames,
ImmutableMap<String, Integer> columnIndex) throws TException {
List<TRow> fullColumnsRow = result.getDataBatch();
List<TRow> filterColumnsRows = Lists.newArrayList();
for (TRow row : fullColumnsRow) {
TRow filterRow = new TRow();
try {
for (String columnName : columnNames) {
Integer index = columnIndex.get(columnName.toLowerCase());
filterRow.addToColumnValue(row.getColumnValue().get(index));
}
} catch (Throwable e) {
LOG.info("error happens when filter columns.", e);
throw new TException(e);
}
filterColumnsRows.add(filterRow);
}
result.setDataBatch(filterColumnsRows);
}
private static long convertToDateTimeV2(
int year, int month, int day, int hour, int minute, int second, int microsecond) {
return (long) microsecond | (long) second << 20 | (long) minute << 26 | (long) hour << 32

View File

@ -27,6 +27,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf {
public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName,
TMetadataTableRequestParams params)
throws AnalysisException {
@ -49,8 +50,6 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf
return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case TASKS:
return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
return ActiveQueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
case WORKLOAD_SCHED_POLICY:
return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:

View File

@ -74,8 +74,6 @@ public abstract class TableValuedFunctionIf {
return new TasksTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
case ActiveQueriesTableValuedFunction.NAME:
return new ActiveQueriesTableValuedFunction(params);
case WorkloadSchedPolicyTableValuedFunction.NAME:
return new WorkloadSchedPolicyTableValuedFunction(params);
default: