[fix](Metadata tvf) Metadata TVF supports read the specified columns from Fe (#19110)

This commit is contained in:
Tiewei Fang
2023-04-29 00:06:08 +08:00
committed by GitHub
parent d006143330
commit c74c2a4f8e
9 changed files with 138 additions and 10 deletions

View File

@ -115,7 +115,6 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> im
initialized = true;
}
// TODO(ftw): drew
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();

View File

@ -108,7 +108,6 @@ public class TestExternalDatabase extends ExternalDatabase<TestExternalTable> im
initialized = true;
}
// TODO(ftw): drew
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();

View File

@ -25,6 +25,7 @@ import org.apache.doris.thrift.TBackendsMetadataParams;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
@ -37,6 +38,39 @@ import java.util.Map;
public class BackendsTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "backends";
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX = new ImmutableMap.Builder<String, Integer>()
.put("backendid", 0)
.put("cluster", 1)
.put("ip", 2)
.put("hostname", 3)
.put("heartbeatport", 4)
.put("beport", 5)
.put("httpport", 6)
.put("brpcport", 7)
.put("laststarttime", 8)
.put("lastheartbeat", 9)
.put("alive", 10)
.put("systemdecommissioned", 11)
.put("clusterdecommissioned", 12)
.put("tabletnum", 13)
.put("datausedcapacity", 14)
.put("availcapacity", 15)
.put("totalcapacity", 16)
.put("usedpct", 17)
.put("maxdiskusedpct", 18)
.put("remoteusedcapacity", 19)
.put("tag", 20)
.put("errmsg", 21)
.put("version", 22)
.put("status", 23)
.put("heartbeatfailurecounter", 24)
.put("noderole", 25)
.build();
public static Integer getColumnIndexFromColumnName(String columnName) {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
public BackendsTableValuedFunction(Map<String, String> params) throws AnalysisException {
if (params.size() != 0) {
throw new AnalysisException("backends table-valued-function does not support any params");

View File

@ -31,6 +31,7 @@ import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@ -50,6 +51,18 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX = new ImmutableMap.Builder<String, Integer>()
.put("committed_at", 0)
.put("snapshot_id", 1)
.put("parent_id", 2)
.put("operation", 3)
.put("manifest_list", 4)
.build();
public static Integer getColumnIndexFromColumnName(String columnName) {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
private TIcebergQueryType queryType;
// here tableName represents the name of a table in Iceberg.
@ -82,7 +95,6 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
this.icebergTableName.getDb() + ": " + this.icebergTableName.getTbl());
}
try {
// TODO(ftw): check here
this.queryType = TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
} catch (IllegalArgumentException e) {
throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType);

View File

@ -33,6 +33,7 @@ import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@ -63,17 +64,25 @@ public class MetadataGenerator {
if (!request.isSetMetadaTableParams()) {
return errorResult("Metadata table params is not set. ");
}
TFetchSchemaTableDataResult result;
TMetadataTableRequestParams params = request.getMetadaTableParams();
switch (request.getMetadaTableParams().getMetadataType()) {
case ICEBERG:
return icebergMetadataResult(request.getMetadaTableParams());
case BACKENDS:
return backendsMetadataResult(request.getMetadaTableParams());
case RESOURCE_GROUPS:
return resourceGroupsMetadataResult(request.getMetadaTableParams());
default:
result = icebergMetadataResult(params);
break;
case BACKENDS:
result = backendsMetadataResult(params);
break;
case RESOURCE_GROUPS:
result = resourceGroupsMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
return errorResult("Metadata table params is not set. ");
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, params.getColumnsName(), params.getMetadataType());
}
return result;
}
@NotNull
@ -119,6 +128,7 @@ public class MetadataGenerator {
}
trow.addToColumnValue(new TCell().setStringVal(snapshot.operation()));
trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation()));
dataBatch.add(trow);
}
break;
@ -232,6 +242,7 @@ public class MetadataGenerator {
// node role, show the value only when backend is alive.
trow.addToColumnValue(new TCell().setStringVal(backend.isAlive() ? backend.getNodeRoleTag().value : ""));
dataBatch.add(trow);
}
@ -265,6 +276,34 @@ public class MetadataGenerator {
return result;
}
private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames, TMetadataType type) {
List<TRow> fullColumnsRow = result.getDataBatch();
List<TRow> filterColumnsRows = Lists.newArrayList();
for (TRow row : fullColumnsRow) {
TRow filterRow = new TRow();
for (String columnName : columnNames) {
Integer index = 0;
switch (type) {
case ICEBERG:
index = IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName);
break;
case BACKENDS:
index = BackendsTableValuedFunction.getColumnIndexFromColumnName(columnName);
break;
case RESOURCE_GROUPS:
index = ResourceGroupsTableValuedFunction.getColumnIndexFromColumnName(columnName);
break;
default:
break;
}
filterRow.addToColumnValue(row.getColumnValue().get(index));
}
filterColumnsRows.add(filterRow);
}
result.setDataBatch(filterColumnsRows);
}
private static org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog catalog, String db, String tbl)
throws MetaNotFoundException {
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();

View File

@ -24,6 +24,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
@ -35,6 +36,16 @@ import java.util.Map;
*/
public class ResourceGroupsTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "resource_groups";
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX = new ImmutableMap.Builder<String, Integer>()
.put("id", 0)
.put("name", 1)
.put("item", 2)
.put("value", 3)
.build();
public static Integer getColumnIndexFromColumnName(String columnName) {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
public ResourceGroupsTableValuedFunction(Map<String, String> params) throws AnalysisException {
if (params.size() != 0) {