[improvement](Variant Type) Support displaying subcolumns expanded for the variant column (#27764)

This commit is contained in:
Sun Chenyang
2023-12-08 20:34:58 +08:00
committed by GitHub
parent 51f320a606
commit 573b594df3
23 changed files with 1163 additions and 7 deletions

View File

@ -92,6 +92,7 @@ public class DescribeStmt extends ShowStmt {
private TableName dbTableName;
private ProcNodeInterface node;
private PartitionNames partitionNames;
List<List<String>> totalRows = new LinkedList<List<String>>();
@ -106,6 +107,12 @@ public class DescribeStmt extends ShowStmt {
this.isAllTables = isAllTables;
}
public DescribeStmt(TableName dbTableName, boolean isAllTables, PartitionNames partitionNames) {
this.dbTableName = dbTableName;
this.isAllTables = isAllTables;
this.partitionNames = partitionNames;
}
public DescribeStmt(TableValuedFunctionRef tableValuedFunctionRef) {
this.tableValuedFunctionRef = tableValuedFunctionRef;
this.isTableValuedFunction = true;
@ -156,6 +163,13 @@ public class DescribeStmt extends ShowStmt {
return;
}
if (partitionNames != null) {
partitionNames.analyze(analyzer);
if (partitionNames.isTemp()) {
throw new AnalysisException("Do not support temp partitions");
}
}
dbTableName.analyze(analyzer);
if (!Env.getCurrentEnv().getAccessManager()
@ -178,9 +192,22 @@ public class DescribeStmt extends ShowStmt {
if (table.getType() == TableType.OLAP) {
procString += ((OlapTable) table).getBaseIndexId();
} else {
if (partitionNames != null) {
throw new AnalysisException(dbTableName.getTbl()
+ " is not a OLAP table, describe table failed");
}
procString += table.getId();
}
if (partitionNames != null) {
procString += "/";
StringBuilder builder = new StringBuilder();
for (String str : partitionNames.getPartitionNames()) {
builder.append(str);
builder.append(",");
}
builder.deleteCharAt(builder.length() - 1);
procString += builder.toString();
}
node = ProcService.getInstance().open(procString);
if (node == null) {
throw new AnalysisException("Describe table[" + dbTableName.getTbl() + "] failed");

View File

@ -23,7 +23,9 @@ import com.google.common.collect.Lists;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public enum AggregateType {
SUM("SUM"),
@ -39,6 +41,20 @@ public enum AggregateType {
private static EnumMap<AggregateType, EnumSet<PrimitiveType>> compatibilityMap;
private static final Map<String, AggregateType> aggTypeMap = new HashMap<>();
static {
aggTypeMap.put("NONE", AggregateType.NONE);
aggTypeMap.put("SUM", AggregateType.SUM);
aggTypeMap.put("MIN", AggregateType.MIN);
aggTypeMap.put("MAX", AggregateType.MAX);
aggTypeMap.put("REPLACE", AggregateType.REPLACE);
aggTypeMap.put("REPLACE_IF_NOT_NULL", AggregateType.REPLACE_IF_NOT_NULL);
aggTypeMap.put("HLL_UNION", AggregateType.HLL_UNION);
aggTypeMap.put("BITMAP_UNION", AggregateType.BITMAP_UNION);
aggTypeMap.put("QUANTILE_UNION", AggregateType.QUANTILE_UNION);
}
static {
compatibilityMap = new EnumMap<>(AggregateType.class);
List<PrimitiveType> primitiveTypeList = Lists.newArrayList();
@ -181,4 +197,8 @@ public enum AggregateType {
return null;
}
}
public static AggregateType getAggTypeFromAggName(String typeName) {
return aggTypeMap.get(typeName);
}
}

View File

@ -2423,4 +2423,14 @@ public class OlapTable extends Table {
}
return false;
}
public List<Tablet> getAllTablets() throws AnalysisException {
List<Tablet> tablets = Lists.newArrayList();
for (Partition partition : getPartitions()) {
for (Tablet tablet : partition.getBaseIndex().getTablets()) {
tablets.add(tablet);
}
}
return tablets;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.qe.SessionVariable;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -128,6 +129,10 @@ public class IndexInfoProcDir implements ProcDirInterface {
throw new AnalysisException("Index " + idxId + " does not exist");
}
bfColumns = olapTable.getCopiedBfColumns();
if (olapTable.hasVariantColumns()
&& SessionVariable.enableDescribeExtendVariantColumn()) {
return new RemoteIndexSchemaProcDir(table, schema, bfColumns);
}
} else {
schema = table.getBaseSchema();
}

View File

@ -49,10 +49,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface {
this.bfColumns = bfColumns;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
public static ProcResult createResult(List<Column> schema, Set<String> bfColumns) throws AnalysisException {
Preconditions.checkNotNull(schema);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
@ -105,4 +103,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface {
return result;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
return createResult(this.schema, this.bfColumns);
}
}

View File

@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
/*
* SHOW PROC /dbs/dbId/tableId/index_schema/indexId"
* show index schema
*/
public class RemoteIndexSchemaProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Field").add("Type").add("Null").add("Key")
.add("Default").add("Extra")
.build();
private List<Column> schema;
private Set<String> bfColumns;
private TableIf table;
public RemoteIndexSchemaProcDir(TableIf table, List<Column> schema, Set<String> bfColumns) {
this.table = table;
this.schema = schema;
this.bfColumns = bfColumns;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(schema);
List<Tablet> tablets = null;
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
tablets = olapTable.getAllTablets();
} finally {
table.readUnlock();
}
List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch();
if (remoteSchema == null || remoteSchema.isEmpty()) {
throw new AnalysisException("fetch remote tablet schema failed");
}
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
}
@Override
public boolean register(String name, ProcNodeInterface node) {
return false;
}
@Override
public ProcNodeInterface lookup(String partitionString) throws AnalysisException {
Preconditions.checkNotNull(table);
List<String> partitionNameList = new ArrayList<String>(Arrays.asList(partitionString.split(",")));
if (partitionNameList == null || partitionNameList.isEmpty()) {
throw new AnalysisException("Describe table[" + table.getName() + "] failed");
}
List<Partition> partitions = Lists.newArrayList();
table.readLock();
try {
if (table.getType() == TableType.OLAP) {
OlapTable olapTable = (OlapTable) table;
for (String partitionName : partitionNameList) {
Partition partition = olapTable.getPartition(partitionName);
if (partition == null) {
throw new AnalysisException("Partition " + partitionName + " does not exist");
}
partitions.add(partition);
}
} else {
throw new AnalysisException(table.getName() + " is not a OLAP table, describe table failed");
}
} catch (Throwable t) {
throw new AnalysisException("Describe table[" + table.getName() + "] failed");
} finally {
table.readUnlock();
}
return new RemoteIndexSchemaProcNode(partitions, this.schema, this.bfColumns);
}
}

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
/*
* SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName"
* show index schema
*/
public class RemoteIndexSchemaProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Field").add("Type").add("Null").add("Key")
.add("Default").add("Extra")
.build();
private List<Partition> partitions;
private List<Column> schema;
private Set<String> bfColumns;
public RemoteIndexSchemaProcNode(List<Partition> partitions, List<Column> schema, Set<String> bfColumns) {
this.partitions = partitions;
this.schema = schema;
this.bfColumns = bfColumns;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(schema);
Preconditions.checkNotNull(partitions);
List<Tablet> tablets = Lists.newArrayList();
for (Partition partition : partitions) {
MaterializedIndex idx = partition.getBaseIndex();
for (Tablet tablet : idx.getTablets()) {
tablets.add(tablet);
}
}
List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch();
if (remoteSchema == null || remoteSchema.isEmpty()) {
throw new AnalysisException("fetch remote tablet schema failed");
}
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
}
}

View File

@ -0,0 +1,220 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
import org.apache.doris.proto.InternalService.PTabletsLocation;
import org.apache.doris.proto.OlapFile.ColumnPB;
import org.apache.doris.proto.OlapFile.TabletSchemaPB;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
// This class is used to pull the specified tablets' columns existing on the Backend (BE)
// including regular columns and columns decomposed by variants
public class FetchRemoteTabletSchemaUtil {
private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
private List<Tablet> remoteTablets;
private List<Column> tableColumns;
public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
this.remoteTablets = tablets;
this.tableColumns = Lists.newArrayList();
}
public List<Column> fetch() {
// 1. Find which Backend (BE) servers the tablets are on
Preconditions.checkNotNull(remoteTablets);
Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
for (Tablet tablet : remoteTablets) {
for (Replica replica : tablet.getReplicas()) {
// only need alive replica
if (replica.isAlive()) {
Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
replica.getBackendId(), k -> Sets.newHashSet());
tabletIds.add(tablet.getId());
}
}
}
// 2. Randomly select 2 Backend (BE) servers to act as coordinators.
// Coordinator BE is responsible for collecting all table columns and returning to the FE.
// Two BE provide a retry opportunity with the second one in case the first attempt fails.
List<PTabletsLocation> locations = Lists.newArrayList();
List<Backend> coordinatorBackend = Lists.newArrayList();
for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
// only need alive be
if (!backend.isAlive()) {
continue;
}
// need 2 be to provide a retry
if (coordinatorBackend.size() < 2) {
coordinatorBackend.add(backend);
}
PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder()
.setHost(backend.getHost())
.setBrpcPort(backend.getBrpcPort());
PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build();
locations.add(location);
}
PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder()
.addAllTabletLocation(locations)
.setIsCoordinator(true);
// 3. Send rpc to coordinatorBackend util succeed or retry
for (Backend be : coordinatorBackend) {
try {
PFetchRemoteSchemaRequest request = requestBuilder.build();
Future<PFetchRemoteSchemaResponse> future = BackendServiceProxy.getInstance()
.fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request);
PFetchRemoteSchemaResponse response = null;
try {
response = future.get(60, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = response.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: "
+ be.getHost() + " : " + be.getBrpcPort();
}
throw new RpcException(be.getHost(), errMsg);
}
fillColumns(response);
return tableColumns;
} catch (AnalysisException e) {
// continue to get result
LOG.warn(e);
} catch (InterruptedException e) {
// continue to get result
LOG.warn("fetch remote schema future get interrupted Exception");
} catch (TimeoutException e) {
future.cancel(true);
// continue to get result
LOG.warn("fetch remote schema result timeout, addr {}", be.getBrpcAddress());
}
} catch (RpcException e) {
LOG.warn("fetch remote schema result rpc exception {}, e {}", be.getBrpcAddress(), e);
} catch (ExecutionException e) {
LOG.warn("fetch remote schema ExecutionException, addr {}, e {}", be.getBrpcAddress(), e);
}
}
return tableColumns;
}
private void fillColumns(PFetchRemoteSchemaResponse response) throws AnalysisException {
TabletSchemaPB schemaPB = response.getMergedSchema();
for (ColumnPB columnPB : schemaPB.getColumnList()) {
try {
Column remoteColumn = initColumnFromPB(columnPB);
tableColumns.add(remoteColumn);
} catch (Exception e) {
throw new AnalysisException("column default value to string failed");
}
}
// sort the columns
Collections.sort(tableColumns, new Comparator<Column>() {
@Override
public int compare(Column c1, Column c2) {
return c1.getName().compareTo(c2.getName());
}
});
}
private Column initColumnFromPB(ColumnPB column) throws AnalysisException {
try {
AggregateType aggType = AggregateType.getAggTypeFromAggName(column.getAggregation());
Type type = Type.getTypeFromTypeName(column.getType());
String columnName = column.getName();
boolean isKey = column.getIsKey();
boolean isNullable = column.getIsNullable();
String defaultValue = column.getDefaultValue().toString("UTF-8");
if (defaultValue.equals("")) {
defaultValue = null;
}
if (isKey) {
aggType = null;
}
do {
if (type.isArrayType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null || childColumn.size() != 1) {
break;
}
Column child = initColumnFromPB(childColumn.get(0));
type = new ArrayType(child.getType());
} else if (type.isMapType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null || childColumn.size() != 2) {
break;
}
Column keyChild = initColumnFromPB(childColumn.get(0));
Column valueChild = initColumnFromPB(childColumn.get(1));
type = new MapType(keyChild.getType(), valueChild.getType());
} else if (type.isStructType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null) {
break;
}
List<Type> childTypes = Lists.newArrayList();
for (ColumnPB childPB : childColumn) {
childTypes.add(initColumnFromPB(childPB).getType());
}
type = new StructType(childTypes);
}
} while (false);
return new Column(columnName, type, isKey, aggType, isNullable,
defaultValue, "remote schema");
} catch (Exception e) {
throw new AnalysisException("default value to string failed");
}
}
}

View File

@ -474,6 +474,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times";
public static final String DESCRIBE_EXTEND_VARIANT_COLUMN = "describe_extend_variant_column";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@ -857,6 +859,9 @@ public class SessionVariable implements Serializable, Writable {
return beNumberForTest;
}
@VariableMgr.VarAttr(name = DESCRIBE_EXTEND_VARIANT_COLUMN, needForward = true)
public boolean enableDescribeExtendVariantColumn = false;
@VariableMgr.VarAttr(name = PROFILLING)
public boolean profiling = false;
@ -3057,6 +3062,22 @@ public class SessionVariable implements Serializable, Writable {
}
}
public boolean getEnableDescribeExtendVariantColumn() {
return enableDescribeExtendVariantColumn;
}
public void setEnableDescribeExtendVariantColumn(boolean enableDescribeExtendVariantColumn) {
this.enableDescribeExtendVariantColumn = enableDescribeExtendVariantColumn;
}
public static boolean enableDescribeExtendVariantColumn() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
}
return connectContext.getSessionVariable().enableDescribeExtendVariantColumn;
}
public int getProfileLevel() {
return this.profileLevel;
}

View File

@ -152,6 +152,11 @@ public class BackendServiceClient {
return stub.reportStreamLoadStatus(request);
}
public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync(
InternalService.PFetchRemoteSchemaRequest request) {
return stub.fetchRemoteTabletSchema(request);
}
public Future<InternalService.PGlobResponse> glob(InternalService.PGlobRequest request) {
return stub.glob(request);
}

View File

@ -466,5 +466,17 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync(
TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.fetchRemoteTabletSchemaAsync(request);
} catch (Throwable e) {
LOG.warn("fetch remote tablet schema catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
}