[fix](tvf) Partition columns in CTAS need to be compatible with the STRING type of external tables/TVF (#37161)
bp: #35489
This commit is contained in:
@ -65,4 +65,12 @@ public class DistributionDesc {
|
||||
public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
|
||||
throw new NotImplementedException("toDistributionInfo not implemented");
|
||||
}
|
||||
|
||||
public List<String> getDistributionColumnNames() {
|
||||
throw new NotImplementedException("getDistributionColumnNames not implemented");
|
||||
}
|
||||
|
||||
public boolean inDistributionColumns(String columnName) {
|
||||
return getDistributionColumnNames() != null && getDistributionColumnNames().contains(columnName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +47,7 @@ public class HashDistributionDesc extends DistributionDesc {
|
||||
this.distributionColumnNames = distributionColumnNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDistributionColumnNames() {
|
||||
return distributionColumnNames;
|
||||
}
|
||||
|
||||
@ -285,4 +285,8 @@ public class PartitionDesc {
|
||||
throws DdlException, AnalysisException {
|
||||
throw new NotImplementedException("toPartitionInfo not implemented");
|
||||
}
|
||||
|
||||
public boolean inIdentifierPartitions(String colName) {
|
||||
return partitionColNames != null && partitionColNames.contains(colName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,4 +62,9 @@ public class RandomDistributionDesc extends DistributionDesc {
|
||||
RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket, autoBucket);
|
||||
return randomDistributionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDistributionColumnNames() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1264,7 +1264,19 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
if (resultExpr.getSrcSlotRef() != null
|
||||
&& resultExpr.getSrcSlotRef().getTable() != null
|
||||
&& !resultExpr.getSrcSlotRef().getTable().isManagedTable()) {
|
||||
typeDef = new TypeDef(ScalarType.createStringType());
|
||||
if (createTableStmt.getPartitionDesc().inIdentifierPartitions(
|
||||
resultExpr.getSrcSlotRef().getColumnName())
|
||||
|| (createTableStmt.getDistributionDesc() != null
|
||||
&& createTableStmt.getDistributionDesc().inDistributionColumns(
|
||||
resultExpr.getSrcSlotRef().getColumnName()))) {
|
||||
// String type can not be used in partition/distributed column
|
||||
// so we replace it to varchar
|
||||
if (resultType.getPrimitiveType() == PrimitiveType.STRING) {
|
||||
typeDef = new TypeDef(ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH));
|
||||
}
|
||||
} else {
|
||||
typeDef = new TypeDef(ScalarType.createStringType());
|
||||
}
|
||||
}
|
||||
} else if (resultType.isDecimalV2() && resultType.equals(ScalarType.DECIMALV2)) {
|
||||
typeDef = new TypeDef(ScalarType.createDecimalType(27, 9));
|
||||
|
||||
@ -206,7 +206,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to create database from hms client", e);
|
||||
throw new HMSClientException("failed to create table from hms client", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,10 +63,14 @@ public class PartitionTableInfo {
|
||||
null);
|
||||
|
||||
private boolean isAutoPartition;
|
||||
// for PartitionType
|
||||
private String partitionType;
|
||||
private List<String> partitionColumns;
|
||||
private List<PartitionDefinition> partitionDefs;
|
||||
// save all list partition expressions, including identifier and function
|
||||
private List<Expression> partitionList;
|
||||
// save identifier expressions in partitionList,
|
||||
// facilitates subsequent verification process
|
||||
private List<String> identifierPartitionColumns;
|
||||
|
||||
/**
|
||||
* struct for partition definition
|
||||
@ -85,7 +89,7 @@ public class PartitionTableInfo {
|
||||
this.partitionDefs = partitionDefs;
|
||||
this.partitionList = partitionFields;
|
||||
if (this.partitionList != null) {
|
||||
this.partitionColumns = this.partitionList.stream()
|
||||
this.identifierPartitionColumns = this.partitionList.stream()
|
||||
.filter(UnboundSlot.class::isInstance)
|
||||
.map(partition -> ((UnboundSlot) partition).getName())
|
||||
.collect(Collectors.toList());
|
||||
@ -100,10 +104,6 @@ public class PartitionTableInfo {
|
||||
return partitionType;
|
||||
}
|
||||
|
||||
public List<String> getPartitionColumns() {
|
||||
return partitionColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* check partitions types.
|
||||
*/
|
||||
@ -169,16 +169,16 @@ public class PartitionTableInfo {
|
||||
boolean isEnableMergeOnWrite,
|
||||
boolean isExternal) {
|
||||
|
||||
if (partitionColumns != null) {
|
||||
if (identifierPartitionColumns != null) {
|
||||
|
||||
if (partitionColumns.size() != partitionList.size()) {
|
||||
if (identifierPartitionColumns.size() != partitionList.size()) {
|
||||
if (!isExternal && partitionType.equalsIgnoreCase(PartitionType.LIST.name())) {
|
||||
throw new AnalysisException("internal catalog does not support functions in 'LIST' partition");
|
||||
}
|
||||
isAutoPartition = true;
|
||||
}
|
||||
|
||||
partitionColumns.forEach(p -> {
|
||||
identifierPartitionColumns.forEach(p -> {
|
||||
if (!columnMap.containsKey(p)) {
|
||||
throw new AnalysisException(
|
||||
String.format("partition key %s is not exists", p));
|
||||
@ -187,7 +187,7 @@ public class PartitionTableInfo {
|
||||
});
|
||||
|
||||
Set<String> partitionColumnSets = Sets.newHashSet();
|
||||
List<String> duplicatesKeys = partitionColumns.stream()
|
||||
List<String> duplicatesKeys = identifierPartitionColumns.stream()
|
||||
.filter(c -> !partitionColumnSets.add(c)).collect(Collectors.toList());
|
||||
if (!duplicatesKeys.isEmpty()) {
|
||||
throw new AnalysisException(
|
||||
@ -199,16 +199,16 @@ public class PartitionTableInfo {
|
||||
// 2. The partition field must be at the end of the schema
|
||||
// 3. The order of partition fields in the schema
|
||||
// must be consistent with the order defined in `PARTITIONED BY LIST()`
|
||||
if (partitionColumns.size() == columns.size()) {
|
||||
if (identifierPartitionColumns.size() == columns.size()) {
|
||||
throw new AnalysisException("Cannot set all columns as partitioning columns.");
|
||||
}
|
||||
List<ColumnDefinition> partitionInSchema = columns.subList(
|
||||
columns.size() - partitionColumns.size(), columns.size());
|
||||
if (partitionInSchema.stream().anyMatch(p -> !partitionColumns.contains(p.getName()))) {
|
||||
columns.size() - identifierPartitionColumns.size(), columns.size());
|
||||
if (partitionInSchema.stream().anyMatch(p -> !identifierPartitionColumns.contains(p.getName()))) {
|
||||
throw new AnalysisException("The partition field must be at the end of the schema.");
|
||||
}
|
||||
for (int i = 0; i < partitionInSchema.size(); i++) {
|
||||
if (!partitionInSchema.get(i).getName().equals(partitionColumns.get(i))) {
|
||||
if (!partitionInSchema.get(i).getName().equals(identifierPartitionColumns.get(i))) {
|
||||
throw new AnalysisException("The order of partition fields in the schema "
|
||||
+ "must be consistent with the order defined in `PARTITIONED BY LIST()`");
|
||||
}
|
||||
@ -234,7 +234,7 @@ public class PartitionTableInfo {
|
||||
partitionNames.add(partitionName);
|
||||
}
|
||||
partitionDefs.forEach(p -> {
|
||||
p.setPartitionTypes(partitionColumns.stream()
|
||||
p.setPartitionTypes(identifierPartitionColumns.stream()
|
||||
.map(s -> columnMap.get(s).getType()).collect(Collectors.toList()));
|
||||
p.validate(Maps.newHashMap(properties));
|
||||
});
|
||||
@ -269,18 +269,18 @@ public class PartitionTableInfo {
|
||||
|
||||
try {
|
||||
ArrayList<Expr> exprs = convertToLegacyAutoPartitionExprs(partitionList);
|
||||
// here we have already extracted partitionColumns
|
||||
// here we have already extracted identifierPartitionColumns
|
||||
if (partitionType.equals(PartitionType.RANGE.name())) {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new RangePartitionDesc(exprs, partitionColumns, partitionDescs);
|
||||
partitionDesc = new RangePartitionDesc(exprs, identifierPartitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs);
|
||||
partitionDesc = new RangePartitionDesc(identifierPartitionColumns, partitionDescs);
|
||||
}
|
||||
} else {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new ListPartitionDesc(exprs, partitionColumns, partitionDescs);
|
||||
partitionDesc = new ListPartitionDesc(exprs, identifierPartitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs);
|
||||
partitionDesc = new ListPartitionDesc(identifierPartitionColumns, partitionDescs);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -319,7 +319,7 @@ public class PartitionTableInfo {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column names and put in partitionColumns
|
||||
* Get column names and put in identifierPartitionColumns
|
||||
*/
|
||||
public void extractPartitionColumns() throws AnalysisException {
|
||||
if (partitionList == null) {
|
||||
@ -327,10 +327,14 @@ public class PartitionTableInfo {
|
||||
}
|
||||
ArrayList<Expr> exprs = convertToLegacyAutoPartitionExprs(partitionList);
|
||||
try {
|
||||
partitionColumns = PartitionDesc.getColNamesFromExpr(exprs,
|
||||
identifierPartitionColumns = PartitionDesc.getColNamesFromExpr(exprs,
|
||||
partitionType.equalsIgnoreCase(PartitionType.LIST.name()), isAutoPartition);
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException(e.getMessage(), e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean inIdentifierPartitions(String columnName) {
|
||||
return identifierPartitionColumns != null && identifierPartitionColumns.contains(columnName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,10 +128,19 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
|
||||
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
|
||||
DecimalV2Type.class, DecimalV2Type.SYSTEM_DEFAULT);
|
||||
if (s.isColumnFromTable()) {
|
||||
if (!((SlotReference) s).getTable().isPresent()
|
||||
|| !((SlotReference) s).getTable().get().isManagedTable()) {
|
||||
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
|
||||
CharacterType.class, StringType.INSTANCE);
|
||||
if ((!((SlotReference) s).getTable().isPresent()
|
||||
|| !((SlotReference) s).getTable().get().isManagedTable())) {
|
||||
if (createTableInfo.getPartitionTableInfo().inIdentifierPartitions(s.getName())
|
||||
|| (createTableInfo.getDistribution() != null
|
||||
&& createTableInfo.getDistribution().inDistributionColumns(s.getName()))) {
|
||||
// String type can not be used in partition/distributed column
|
||||
// so we replace it to varchar
|
||||
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
|
||||
StringType.class, VarcharType.MAX_VARCHAR_TYPE);
|
||||
} else {
|
||||
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
|
||||
CharacterType.class, StringType.INSTANCE);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
|
||||
|
||||
@ -801,5 +801,13 @@ public class CreateTableInfo {
|
||||
public void setIsExternal(boolean isExternal) {
|
||||
this.isExternal = isExternal;
|
||||
}
|
||||
|
||||
public PartitionTableInfo getPartitionTableInfo() {
|
||||
return partitionTableInfo;
|
||||
}
|
||||
|
||||
public DistributionDescriptor getDistribution() {
|
||||
return distribution;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -107,4 +107,8 @@ public class DistributionDescriptor {
|
||||
}
|
||||
return new RandomDistributionDesc(bucketNum, isAutoBucket);
|
||||
}
|
||||
|
||||
public boolean inDistributionColumns(String columnName) {
|
||||
return cols != null && cols.contains(columnName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -469,7 +469,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
// HACK(tsy): path columns are all treated as STRING type now, after BE supports reading all columns
|
||||
// types by all format readers from file meta, maybe reading path columns types from BE then.
|
||||
for (String colName : pathPartitionKeys) {
|
||||
columns.add(new Column(colName, Type.STRING, false));
|
||||
columns.add(new Column(colName, ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), false));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user