[feature](meta) queries as table valued function (#25052) (#25052)

1. Add queries view as table function.
2. Proxy result to other FEs and return merged results back to BE.

Co-authored-by: yiguolei <676222867@qq.com>
This commit is contained in:
Nitin-Kashyap
2023-10-12 13:56:14 +05:30
committed by GitHub
parent 1c3ecbbae9
commit bdb64eab73
10 changed files with 250 additions and 0 deletions

View File

@ -231,6 +231,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::CATALOGS:
RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::QUERIES:
RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request));
break;
default:
_meta_eos = true;
return Status::OK();
@ -372,6 +375,25 @@ Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta
return Status::OK();
}
Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request";
if (!meta_scan_range.__isset.queries_params) {
return Status::InternalError("Can not find TQueriesMetadataParams from meta_scan_range.");
}
// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::QUERIES);
metadata_table_params.__set_queries_metadata_params(meta_scan_range.queries_params);
request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}
Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
RETURN_IF_ERROR(VScanner::close(state));

View File

@ -81,6 +81,8 @@ private:
TFetchSchemaTableDataRequest* request);
Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
TupleId _tuple_id;
TUserIdentity _user_identity;

View File

@ -19,6 +19,7 @@ package org.apache.doris.common.proc;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.DiskUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
@ -87,6 +88,23 @@ public class FrontendsProcNode implements ProcNodeInterface {
}
}
public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env, boolean includeSelf) {
List<Pair<String, Integer>> allFe = new ArrayList<>();
List<Frontend> frontends = env.getFrontends(null);
String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}
String finalSelfNode = selfNode;
frontends.stream()
.filter(fe -> (!fe.getHost().equals(finalSelfNode) || includeSelf))
.map(fe -> Pair.of(fe.getHost(), fe.getRpcPort()))
.forEach(allFe::add);
return allFe;
}
public static void getFrontendsInfo(Env env, List<List<String>> infos) {
InetSocketAddress master = null;
try {

View File

@ -19,16 +19,23 @@ package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryDetail;
import org.apache.doris.qe.QueryDetailQueue;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TBackendsMetadataParams;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
@ -37,6 +44,8 @@ import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@ -57,6 +66,7 @@ import org.jetbrains.annotations.NotNull;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -92,6 +102,9 @@ public class MetadataGenerator {
case CATALOGS:
result = catalogsMetadataResult(params);
break;
case QUERIES:
result = queriesMetadataResult(params, request);
break;
default:
return errorResult("Metadata table params is not set.");
}
@ -352,6 +365,96 @@ public class MetadataGenerator {
return result;
}
private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
return errorResult("queries metadata param is not set.");
}
TQueriesMetadataParams queriesMetadataParams = params.getQueriesMetadataParams();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}
selfNode = NetUtils.getHostnameByIp(selfNode);
List<TRow> dataBatch = Lists.newArrayList();
List<QueryDetail> queries = QueryDetailQueue.getQueryDetails(0L);
for (QueryDetail query : queries) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(query.getQueryId()));
trow.addToColumnValue(new TCell().setLongVal(query.getStartTime()));
trow.addToColumnValue(new TCell().setLongVal(query.getEndTime()));
trow.addToColumnValue(new TCell().setLongVal(query.getEventTime()));
if (query.getState() == QueryDetail.QueryMemState.RUNNING) {
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - query.getStartTime()));
} else {
trow.addToColumnValue(new TCell().setLongVal(query.getLatency()));
}
trow.addToColumnValue(new TCell().setStringVal(query.getState().toString()));
trow.addToColumnValue(new TCell().setStringVal(query.getDatabase()));
trow.addToColumnValue(new TCell().setStringVal(query.getSql()));
trow.addToColumnValue(new TCell().setStringVal(selfNode));
dataBatch.add(trow);
}
/* Get the query results from other FE also */
if (queriesMetadataParams.isRelayToOtherFe()) {
TFetchSchemaTableDataRequest relayRequest = new TFetchSchemaTableDataRequest(parentRequest);
TMetadataTableRequestParams relayParams = new TMetadataTableRequestParams(params);
TQueriesMetadataParams relayQueryParams = new TQueriesMetadataParams(queriesMetadataParams);
relayQueryParams.setRelayToOtherFe(false);
relayParams.setQueriesMetadataParams(relayQueryParams);
relayRequest.setMetadaTableParams(relayParams);
List<TFetchSchemaTableDataResult> relayResults = forwardToOtherFrontends(relayRequest);
relayResults
.forEach(rs -> rs.getDataBatch()
.forEach(row -> dataBatch.add(row)));
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
private static List<TFetchSchemaTableDataResult> forwardToOtherFrontends(TFetchSchemaTableDataRequest request) {
List<TFetchSchemaTableDataResult> results = new ArrayList<>();
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(), false);
FrontendService.Client client = null;
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeOut * 1000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}
boolean isReturnToPool = false;
try {
TFetchSchemaTableDataResult result = client.fetchSchemaTableData(request);
results.add(result);
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to finish forward fetch operation to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
return results;
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames, TMetadataType type) throws TException {
List<TRow> fullColumnsRow = result.getDataBatch();

View File

@ -41,6 +41,8 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf
return WorkloadGroupsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case CATALOGS:
return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case QUERIES:
return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata TableValuedFunction type");
}

View File

@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TQueriesMetadataParams;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
public class QueriesTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "queries";
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("QueryId", ScalarType.createStringType()),
new Column("StartTime", PrimitiveType.BIGINT),
new Column("EndTime", PrimitiveType.BIGINT, true),
new Column("EventTime", PrimitiveType.BIGINT, true),
new Column("Latency", PrimitiveType.BIGINT),
new Column("State", ScalarType.createStringType()),
new Column("Database", ScalarType.createStringType(), true),
new Column("Sql", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()));
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
public static Integer getColumnIndexFromColumnName(String columnName) {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
public QueriesTableValuedFunction(Map<String, String> params) throws AnalysisException {
if (params.size() != 0) {
throw new AnalysisException("Queries table-valued-function does not support any params");
}
}
@Override
public TMetadataType getMetadataType() {
return TMetadataType.QUERIES;
}
@Override
public TMetaScanRange getMetaScanRange() {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.QUERIES);
TQueriesMetadataParams queriesMetadataParams = new TQueriesMetadataParams();
queriesMetadataParams.setClusterName("");
queriesMetadataParams.setRelayToOtherFe(true);
metaScanRange.setQueriesParams(queriesMetadataParams);
return metaScanRange;
}
@Override
public String getTableName() {
return "QueriesTableValuedFunction";
}
@Override
public List<Column> getTableColumns() throws AnalysisException {
return SCHEMA;
}
}

View File

@ -68,6 +68,8 @@ public abstract class TableValuedFunctionIf {
return new CatalogsTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:
return new QueriesTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}

View File

@ -884,6 +884,7 @@ struct TMetadataTableRequestParams {
4: optional list<string> columns_name
5: optional PlanNodes.TFrontendsMetadataParams frontends_metadata_params
6: optional Types.TUserIdentity current_user_ident
7: optional PlanNodes.TQueriesMetadataParams queries_metadata_params
}
struct TFetchSchemaTableDataRequest {

View File

@ -469,11 +469,17 @@ struct TFrontendsMetadataParams {
1: optional string cluster_name
}
struct TQueriesMetadataParams {
1: optional string cluster_name
2: optional bool relay_to_other_fe
}
struct TMetaScanRange {
1: optional Types.TMetadataType metadata_type
2: optional TIcebergMetadataParams iceberg_params
3: optional TBackendsMetadataParams backends_params
4: optional TFrontendsMetadataParams frontends_params
5: optional TQueriesMetadataParams queries_params;
}
// Specification of an individual data range which is held in its entirety

View File

@ -692,6 +692,7 @@ enum TMetadataType {
FRONTENDS,
CATALOGS,
FRONTENDS_DISKS,
QUERIES,
}
enum TIcebergQueryType {