[feature](external catalog)Add partition grammar for external catalog to create table (#31585)
The `PARTITION BY` syntax used by external catalogs has been added. You can specify a column directly, or a partition function as a partition condition. Like: `PARTITION BY LIST(col1, col2, func(param), func(param1, param2), func(param1, param2, param3))` NOTICE: This PR change the grammar of `AUTO PARTITION` From ``` AUTO PARTITION BY RANGE date_trunc(`TIME_STAMP`, 'month') ``` To ``` AUTO PARTITION BY RANGE (date_trunc(`TIME_STAMP`, 'month')) ```
This commit is contained in:
@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
// to describe the key list partition's information in create table stmt
|
||||
public class ListPartitionDesc extends PartitionDesc {
|
||||
@ -37,6 +38,9 @@ public class ListPartitionDesc extends PartitionDesc {
|
||||
super(partitionColNames, allPartitionDescs);
|
||||
type = PartitionType.LIST;
|
||||
this.isAutoCreatePartitions = false;
|
||||
this.partitionExprs = new ArrayList<>(partitionColNames.stream()
|
||||
.map(col -> new SlotRef(null, col))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public ListPartitionDesc(ArrayList<Expr> exprs, List<String> partitionColNames,
|
||||
@ -68,12 +72,12 @@ public class ListPartitionDesc extends PartitionDesc {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("PARTITION BY LIST(");
|
||||
int idx = 0;
|
||||
for (String column : partitionColNames) {
|
||||
if (idx != 0) {
|
||||
for (Expr e : partitionExprs) {
|
||||
if (idx > 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append("`").append(column).append("`");
|
||||
idx++;
|
||||
sb.append(e.toSql());
|
||||
}
|
||||
sb.append(")\n(\n");
|
||||
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
// to describe the key range partition's information in create table stmt
|
||||
public class RangePartitionDesc extends PartitionDesc {
|
||||
@ -35,6 +36,9 @@ public class RangePartitionDesc extends PartitionDesc {
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
super(partitionColNames, allPartitionDescs);
|
||||
type = org.apache.doris.catalog.PartitionType.RANGE;
|
||||
this.partitionExprs = new ArrayList<>(partitionColNames.stream()
|
||||
.map(col -> new SlotRef(null, col))
|
||||
.collect(Collectors.toList()));
|
||||
this.isAutoCreatePartitions = false;
|
||||
}
|
||||
|
||||
|
||||
@ -2435,33 +2435,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
// NOTICE: we should not generate immutable map here, because it will be modified when analyzing.
|
||||
? Maps.newHashMap(visitPropertyClause(ctx.extProperties))
|
||||
: Maps.newHashMap();
|
||||
String partitionType = null;
|
||||
if (ctx.PARTITION() != null) {
|
||||
partitionType = ctx.RANGE() != null ? "RANGE" : "LIST";
|
||||
}
|
||||
boolean isAutoPartition = ctx.autoPartition != null;
|
||||
ImmutableList.Builder<Expression> autoPartitionExpr = new ImmutableList.Builder<>();
|
||||
if (isAutoPartition) {
|
||||
if (ctx.RANGE() != null) {
|
||||
// AUTO PARTITION BY RANGE FUNC_CALL_EXPR
|
||||
if (ctx.partitionExpr != null) {
|
||||
autoPartitionExpr.add(visitFunctionCallExpression(ctx.partitionExpr));
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"AUTO PARTITION BY RANGE must provide a function expr");
|
||||
}
|
||||
} else {
|
||||
// AUTO PARTITION BY LIST(`partition_col`)
|
||||
if (ctx.partitionKeys != null) {
|
||||
// only support one column in auto partition
|
||||
autoPartitionExpr.addAll(visitIdentifierList(ctx.partitionKeys).stream()
|
||||
.distinct().map(name -> UnboundSlot.quoted(name))
|
||||
.collect(Collectors.toList()));
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"AUTO PARTITION BY List must provide a partition column");
|
||||
}
|
||||
}
|
||||
|
||||
// solve partition by
|
||||
PartitionTableInfo partitionInfo;
|
||||
if (ctx.partition != null) {
|
||||
partitionInfo = (PartitionTableInfo) ctx.partitionTable().accept(this);
|
||||
} else {
|
||||
partitionInfo = PartitionTableInfo.EMPTY;
|
||||
}
|
||||
|
||||
if (ctx.columnDefs() != null) {
|
||||
@ -2480,11 +2460,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
keysType,
|
||||
ctx.keys != null ? visitIdentifierList(ctx.keys) : ImmutableList.of(),
|
||||
comment,
|
||||
isAutoPartition,
|
||||
autoPartitionExpr.build(),
|
||||
partitionType,
|
||||
ctx.partitionKeys != null ? visitIdentifierList(ctx.partitionKeys) : null,
|
||||
ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null,
|
||||
partitionInfo,
|
||||
desc,
|
||||
ctx.rollupDefs() != null ? visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(),
|
||||
properties,
|
||||
@ -2502,11 +2478,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
keysType,
|
||||
ctx.keys != null ? visitIdentifierList(ctx.keys) : ImmutableList.of(),
|
||||
comment,
|
||||
isAutoPartition,
|
||||
autoPartitionExpr.build(),
|
||||
partitionType,
|
||||
ctx.partitionKeys != null ? visitIdentifierList(ctx.partitionKeys) : null,
|
||||
ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null,
|
||||
partitionInfo,
|
||||
desc,
|
||||
ctx.rollupDefs() != null ? visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(),
|
||||
properties,
|
||||
@ -2517,6 +2489,26 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionTableInfo visitPartitionTable(DorisParser.PartitionTableContext ctx) {
|
||||
boolean isAutoPartition = ctx.autoPartition != null;
|
||||
ImmutableList<Expression> partitionList = ctx.partitionList.identityOrFunction().stream()
|
||||
.map(partition -> {
|
||||
IdentifierContext identifier = partition.identifier();
|
||||
if (identifier != null) {
|
||||
return UnboundSlot.quoted(identifier.getText());
|
||||
} else {
|
||||
return visitFunctionCallExpression(partition.functionCallExpression());
|
||||
}
|
||||
})
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
return new PartitionTableInfo(
|
||||
isAutoPartition,
|
||||
ctx.RANGE() != null ? "RANGE" : "LIST",
|
||||
ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null,
|
||||
partitionList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ColumnDefinition> visitColumnDefs(ColumnDefsContext ctx) {
|
||||
return ctx.cols.stream().map(this::visitColumnDef).collect(Collectors.toList());
|
||||
|
||||
@ -0,0 +1,292 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.parser;
|
||||
|
||||
import org.apache.doris.analysis.AllPartitionDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.FunctionParams;
|
||||
import org.apache.doris.analysis.ListPartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.RangePartitionDesc;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.nereids.analyzer.UnboundFunction;
|
||||
import org.apache.doris.nereids.analyzer.UnboundSlot;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.FixedRangePartition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.InPartition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* partition info for 'PARTITION BY'
|
||||
*/
|
||||
public class PartitionTableInfo {
|
||||
|
||||
public static final PartitionTableInfo EMPTY = new PartitionTableInfo(
|
||||
false,
|
||||
PartitionType.UNPARTITIONED.name(),
|
||||
null,
|
||||
null);
|
||||
|
||||
private boolean isAutoPartition;
|
||||
private String partitionType;
|
||||
private List<String> partitionColumns;
|
||||
private List<PartitionDefinition> partitionDefs;
|
||||
private List<Expression> partitionList;
|
||||
|
||||
/**
|
||||
* struct for partition definition
|
||||
*
|
||||
* @param isAutoPartition Whether it is an automatic partition
|
||||
* @param partitionType partition type
|
||||
* @param partitionFields partition fields
|
||||
*/
|
||||
public PartitionTableInfo(
|
||||
boolean isAutoPartition,
|
||||
String partitionType,
|
||||
List<PartitionDefinition> partitionDefs,
|
||||
List<Expression> partitionFields) {
|
||||
this.isAutoPartition = isAutoPartition;
|
||||
this.partitionType = partitionType;
|
||||
this.partitionDefs = partitionDefs;
|
||||
this.partitionList = partitionFields;
|
||||
if (this.partitionList != null) {
|
||||
this.partitionColumns = this.partitionList.stream()
|
||||
.filter(UnboundSlot.class::isInstance)
|
||||
.map(partition -> ((UnboundSlot) partition).getName())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isAutoPartition() {
|
||||
return isAutoPartition;
|
||||
}
|
||||
|
||||
public String getPartitionType() {
|
||||
return partitionType;
|
||||
}
|
||||
|
||||
public List<String> getPartitionColumns() {
|
||||
return partitionColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* check partitions types.
|
||||
*/
|
||||
private boolean checkPartitionsTypes() {
|
||||
if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) {
|
||||
if (partitionDefs.stream().allMatch(
|
||||
p -> p instanceof StepPartition || p instanceof FixedRangePartition)) {
|
||||
return true;
|
||||
}
|
||||
return partitionDefs.stream().allMatch(
|
||||
p -> (p instanceof LessThanPartition) || (p instanceof FixedRangePartition));
|
||||
}
|
||||
return partitionType.equalsIgnoreCase(PartitionType.LIST.name())
|
||||
&& partitionDefs.stream().allMatch(p -> p instanceof InPartition);
|
||||
}
|
||||
|
||||
private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx, boolean isEnableMergeOnWrite) {
|
||||
if (!column.isKey()
|
||||
&& (!column.getAggType().equals(AggregateType.NONE) || isEnableMergeOnWrite)) {
|
||||
throw new AnalysisException("The partition column could not be aggregated column");
|
||||
}
|
||||
if (column.getType().isFloatLikeType()) {
|
||||
throw new AnalysisException("Floating point type column can not be partition column");
|
||||
}
|
||||
if (column.getType().isStringType()) {
|
||||
throw new AnalysisException("String Type should not be used in partition column["
|
||||
+ column.getName() + "].");
|
||||
}
|
||||
if (column.getType().isComplexType()) {
|
||||
throw new AnalysisException("Complex type column can't be partition column: "
|
||||
+ column.getType().toString());
|
||||
}
|
||||
// prohibit to create auto partition with null column anyhow
|
||||
if (this.isAutoPartition && column.isNullable()) {
|
||||
throw new AnalysisException("The auto partition column must be NOT NULL");
|
||||
}
|
||||
if (!ctx.getSessionVariable().isAllowPartitionColumnNullable() && column.isNullable()) {
|
||||
throw new AnalysisException(
|
||||
"The partition column must be NOT NULL with allow_partition_column_nullable OFF");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the relationship between partitions and columns
|
||||
*
|
||||
* @param columnMap column map of table
|
||||
* @param properties properties of table
|
||||
* @param ctx context
|
||||
* @param isEnableMergeOnWrite whether enable merge on write
|
||||
*/
|
||||
public void validatePartitionInfo(
|
||||
Map<String, ColumnDefinition> columnMap,
|
||||
Map<String, String> properties,
|
||||
ConnectContext ctx,
|
||||
boolean isEnableMergeOnWrite,
|
||||
boolean isExternal) {
|
||||
|
||||
if (partitionColumns != null) {
|
||||
|
||||
if (partitionColumns.size() != partitionList.size()) {
|
||||
if (!isExternal && partitionType.equalsIgnoreCase(PartitionType.LIST.name())) {
|
||||
throw new AnalysisException("internal catalog does not support functions in 'LIST' partition");
|
||||
}
|
||||
isAutoPartition = true;
|
||||
}
|
||||
|
||||
partitionColumns.forEach(p -> {
|
||||
if (!columnMap.containsKey(p)) {
|
||||
throw new AnalysisException(
|
||||
String.format("partition key %s is not exists", p));
|
||||
}
|
||||
validatePartitionColumn(columnMap.get(p), ctx, isEnableMergeOnWrite);
|
||||
});
|
||||
|
||||
Set<String> partitionColumnSets = Sets.newHashSet();
|
||||
List<String> duplicatesKeys = partitionColumns.stream()
|
||||
.filter(c -> !partitionColumnSets.add(c)).collect(Collectors.toList());
|
||||
if (!duplicatesKeys.isEmpty()) {
|
||||
throw new AnalysisException(
|
||||
"Duplicated partition column " + duplicatesKeys.get(0));
|
||||
}
|
||||
|
||||
if (partitionDefs != null) {
|
||||
if (!checkPartitionsTypes()) {
|
||||
throw new AnalysisException(
|
||||
"partitions types is invalid, expected FIXED or LESS in range partitions"
|
||||
+ " and IN in list partitions");
|
||||
}
|
||||
Set<String> partitionNames = Sets.newHashSet();
|
||||
for (PartitionDefinition partition : partitionDefs) {
|
||||
if (partition instanceof StepPartition) {
|
||||
continue;
|
||||
}
|
||||
String partitionName = partition.getPartitionName();
|
||||
if (partitionNames.contains(partitionName)) {
|
||||
throw new AnalysisException(
|
||||
"Duplicated named partition: " + partitionName);
|
||||
}
|
||||
partitionNames.add(partitionName);
|
||||
}
|
||||
partitionDefs.forEach(p -> {
|
||||
p.setPartitionTypes(partitionColumns.stream()
|
||||
.map(s -> columnMap.get(s).getType()).collect(Collectors.toList()));
|
||||
p.validate(Maps.newHashMap(properties));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert to PartitionDesc types.
|
||||
*/
|
||||
public PartitionDesc convertToPartitionDesc(boolean isExternal) {
|
||||
PartitionDesc partitionDesc = null;
|
||||
if (isExternal) {
|
||||
isAutoPartition = true;
|
||||
}
|
||||
if (!partitionType.equalsIgnoreCase(PartitionType.UNPARTITIONED.name())) {
|
||||
List<AllPartitionDesc> partitionDescs =
|
||||
partitionDefs != null
|
||||
? partitionDefs.stream().map(PartitionDefinition::translateToCatalogStyle)
|
||||
.collect(Collectors.toList())
|
||||
: null;
|
||||
|
||||
int createTablePartitionMaxNum = ConnectContext.get().getSessionVariable().getCreateTablePartitionMaxNum();
|
||||
if (partitionDescs != null && partitionDescs.size() > createTablePartitionMaxNum) {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException(String.format(
|
||||
"The number of partitions to be created is [%s], exceeding the maximum value of [%s]. "
|
||||
+ "Creating too many partitions can be time-consuming. If necessary, "
|
||||
+ "You can set the session variable 'create_table_partition_max_num' "
|
||||
+ "to a larger value.",
|
||||
partitionDescs.size(), createTablePartitionMaxNum));
|
||||
}
|
||||
|
||||
try {
|
||||
if (partitionType.equals(PartitionType.RANGE.name())) {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new RangePartitionDesc(
|
||||
convertToLegacyAutoPartitionExprs(partitionList),
|
||||
partitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs);
|
||||
}
|
||||
} else {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new ListPartitionDesc(
|
||||
convertToLegacyAutoPartitionExprs(partitionList),
|
||||
partitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException(e.getMessage(), e.getCause());
|
||||
}
|
||||
}
|
||||
return partitionDesc;
|
||||
}
|
||||
|
||||
private static ArrayList<Expr> convertToLegacyAutoPartitionExprs(List<Expression> expressions) {
|
||||
return new ArrayList<>(expressions.stream().map(expression -> {
|
||||
if (expression instanceof UnboundSlot) {
|
||||
return new SlotRef(null, ((UnboundSlot) expression).getName());
|
||||
} else if (expression instanceof UnboundFunction) {
|
||||
UnboundFunction function = (UnboundFunction) expression;
|
||||
return new FunctionCallExpr(
|
||||
function.getName(),
|
||||
new FunctionParams(convertToLegacyArguments(function.children())));
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"unsupported auto partition expr " + expression.toString());
|
||||
}
|
||||
}).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private static List<Expr> convertToLegacyArguments(List<Expression> children) {
|
||||
return children.stream().map(child -> {
|
||||
if (child instanceof UnboundSlot) {
|
||||
return new SlotRef(null, ((UnboundSlot) child).getName());
|
||||
} else if (child instanceof Literal) {
|
||||
return new StringLiteral(((Literal) child).getStringValue());
|
||||
} else {
|
||||
throw new AnalysisException("unsupported argument " + child.toString());
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@ -184,4 +184,9 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitCreateTableCommand(this, context);
|
||||
}
|
||||
|
||||
// for test
|
||||
public CreateTableInfo getCreateTableInfo() {
|
||||
return createTableInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,27 +17,18 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands.info;
|
||||
|
||||
import org.apache.doris.analysis.AllPartitionDesc;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.FunctionParams;
|
||||
import org.apache.doris.analysis.IndexDef;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.ListPartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.RangePartitionDesc;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Index;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
@ -52,13 +43,9 @@ import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.datasource.es.EsUtil;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.analyzer.UnboundFunction;
|
||||
import org.apache.doris.nereids.analyzer.UnboundSlot;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.parser.PartitionTableInfo;
|
||||
import org.apache.doris.nereids.types.DataType;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
@ -69,7 +56,6 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -94,22 +80,17 @@ public class CreateTableInfo {
|
||||
private KeysType keysType;
|
||||
private List<String> keys;
|
||||
private final String comment;
|
||||
private final String partitionType;
|
||||
private List<String> partitionColumns;
|
||||
private final List<PartitionDefinition> partitions;
|
||||
private DistributionDescriptor distribution;
|
||||
private final List<RollupDefinition> rollups;
|
||||
private Map<String, String> properties;
|
||||
private Map<String, String> extProperties;
|
||||
private boolean isEnableMergeOnWrite = false;
|
||||
|
||||
private final boolean isAutoPartition;
|
||||
private final List<Expression> autoPartitionExprs;
|
||||
|
||||
private boolean isExternal = false;
|
||||
private String clusterName = null;
|
||||
private List<String> clusterKeysColumnNames = null;
|
||||
private List<Integer> clusterKeysColumnIds = null;
|
||||
private PartitionTableInfo partitionTableInfo;
|
||||
|
||||
/**
|
||||
* constructor for create table
|
||||
@ -117,8 +98,7 @@ public class CreateTableInfo {
|
||||
public CreateTableInfo(boolean ifNotExists, boolean isExternal, String ctlName, String dbName,
|
||||
String tableName, List<ColumnDefinition> columns, List<IndexDefinition> indexes,
|
||||
String engineName, KeysType keysType, List<String> keys, String comment,
|
||||
boolean isAutoPartition, List<Expression> autoPartitionExprs, String partitionType,
|
||||
List<String> partitionColumns, List<PartitionDefinition> partitions,
|
||||
PartitionTableInfo partitionTableInfo,
|
||||
DistributionDescriptor distribution, List<RollupDefinition> rollups,
|
||||
Map<String, String> properties, Map<String, String> extProperties,
|
||||
List<String> clusterKeyColumnNames) {
|
||||
@ -134,11 +114,7 @@ public class CreateTableInfo {
|
||||
this.keysType = keysType;
|
||||
this.keys = Utils.copyRequiredList(keys);
|
||||
this.comment = comment;
|
||||
this.isAutoPartition = isAutoPartition;
|
||||
this.autoPartitionExprs = autoPartitionExprs;
|
||||
this.partitionType = partitionType;
|
||||
this.partitionColumns = partitionColumns;
|
||||
this.partitions = partitions;
|
||||
this.partitionTableInfo = partitionTableInfo;
|
||||
this.distribution = distribution;
|
||||
this.rollups = Utils.copyRequiredList(rollups);
|
||||
this.properties = properties;
|
||||
@ -151,9 +127,8 @@ public class CreateTableInfo {
|
||||
*/
|
||||
public CreateTableInfo(boolean ifNotExists, boolean isExternal, String ctlName, String dbName,
|
||||
String tableName, List<String> cols, String engineName, KeysType keysType,
|
||||
List<String> keys, String comment, boolean isAutoPartition,
|
||||
List<Expression> autoPartitionExprs, String partitionType,
|
||||
List<String> partitionColumns, List<PartitionDefinition> partitions,
|
||||
List<String> keys, String comment,
|
||||
PartitionTableInfo partitionTableInfo,
|
||||
DistributionDescriptor distribution, List<RollupDefinition> rollups,
|
||||
Map<String, String> properties, Map<String, String> extProperties,
|
||||
List<String> clusterKeyColumnNames) {
|
||||
@ -169,11 +144,7 @@ public class CreateTableInfo {
|
||||
this.keysType = keysType;
|
||||
this.keys = Utils.copyRequiredList(keys);
|
||||
this.comment = comment;
|
||||
this.isAutoPartition = isAutoPartition;
|
||||
this.autoPartitionExprs = autoPartitionExprs;
|
||||
this.partitionType = partitionType;
|
||||
this.partitionColumns = partitionColumns;
|
||||
this.partitions = partitions;
|
||||
this.partitionTableInfo = partitionTableInfo;
|
||||
this.distribution = distribution;
|
||||
this.rollups = Utils.copyRequiredList(rollups);
|
||||
this.properties = properties;
|
||||
@ -455,54 +426,8 @@ public class CreateTableInfo {
|
||||
}
|
||||
});
|
||||
|
||||
if (isAutoPartition) {
|
||||
partitionColumns = ExpressionUtils
|
||||
.collectAll(autoPartitionExprs, UnboundSlot.class::isInstance).stream()
|
||||
.map(slot -> ((UnboundSlot) slot).getName()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
if (partitionColumns != null) {
|
||||
partitionColumns.forEach(p -> {
|
||||
if (!columnMap.containsKey(p)) {
|
||||
throw new AnalysisException(
|
||||
String.format("partition key %s is not exists", p));
|
||||
}
|
||||
validatePartitionColumn(columnMap.get(p), ctx);
|
||||
});
|
||||
|
||||
Set<String> partitionColumnSets = Sets.newHashSet();
|
||||
List<String> duplicatesKeys = partitionColumns.stream()
|
||||
.filter(c -> !partitionColumnSets.add(c)).collect(Collectors.toList());
|
||||
if (!duplicatesKeys.isEmpty()) {
|
||||
throw new AnalysisException(
|
||||
"Duplicated partition column " + duplicatesKeys.get(0));
|
||||
}
|
||||
|
||||
if (partitions != null) {
|
||||
if (!checkPartitionsTypes()) {
|
||||
throw new AnalysisException(
|
||||
"partitions types is invalid, expected FIXED or LESS in range partitions"
|
||||
+ " and IN in list partitions");
|
||||
}
|
||||
Set<String> partitionNames = Sets.newHashSet();
|
||||
for (PartitionDefinition partition : partitions) {
|
||||
if (partition instanceof StepPartition) {
|
||||
continue;
|
||||
}
|
||||
String partitionName = partition.getPartitionName();
|
||||
if (partitionNames.contains(partitionName)) {
|
||||
throw new AnalysisException(
|
||||
"Duplicated named partition: " + partitionName);
|
||||
}
|
||||
partitionNames.add(partitionName);
|
||||
}
|
||||
partitions.forEach(p -> {
|
||||
p.setPartitionTypes(partitionColumns.stream()
|
||||
.map(s -> columnMap.get(s).getType()).collect(Collectors.toList()));
|
||||
p.validate(Maps.newHashMap(properties));
|
||||
});
|
||||
}
|
||||
}
|
||||
// validate partition
|
||||
partitionTableInfo.validatePartitionInfo(columnMap, properties, ctx, isEnableMergeOnWrite, isExternal);
|
||||
|
||||
// validate distribution descriptor
|
||||
distribution.updateCols(columns.get(0).getName());
|
||||
@ -532,6 +457,16 @@ public class CreateTableInfo {
|
||||
"Create " + engineName + " table should not contain keys desc");
|
||||
}
|
||||
|
||||
if (!rollups.isEmpty()) {
|
||||
throw new AnalysisException(engineName + " catalog doesn't support rollup tables.");
|
||||
}
|
||||
|
||||
if (engineName.equalsIgnoreCase("iceberg") && distribution != null) {
|
||||
throw new AnalysisException(
|
||||
"Iceberg doesn't support 'DISTRIBUTE BY', "
|
||||
+ "and you can use 'bucket(num, column)' in 'PARTITIONED BY'.");
|
||||
}
|
||||
|
||||
for (ColumnDefinition columnDef : columns) {
|
||||
columnDef.setIsKey(true);
|
||||
}
|
||||
@ -599,22 +534,6 @@ public class CreateTableInfo {
|
||||
validate(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* check partitions types.
|
||||
*/
|
||||
private boolean checkPartitionsTypes() {
|
||||
if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) {
|
||||
if (partitions.stream().allMatch(
|
||||
p -> p instanceof StepPartition || p instanceof FixedRangePartition)) {
|
||||
return true;
|
||||
}
|
||||
return partitions.stream().allMatch(
|
||||
p -> (p instanceof LessThanPartition) || (p instanceof FixedRangePartition));
|
||||
}
|
||||
return partitionType.equalsIgnoreCase(PartitionType.LIST.name())
|
||||
&& partitions.stream().allMatch(p -> p instanceof InPartition);
|
||||
}
|
||||
|
||||
private void checkEngineName() {
|
||||
if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker")
|
||||
|| engineName.equals("elasticsearch") || engineName.equals("hive") || engineName.equals("iceberg")
|
||||
@ -643,32 +562,6 @@ public class CreateTableInfo {
|
||||
}
|
||||
}
|
||||
|
||||
private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx) {
|
||||
if (!column.isKey()
|
||||
&& (!column.getAggType().equals(AggregateType.NONE) || isEnableMergeOnWrite)) {
|
||||
throw new AnalysisException("The partition column could not be aggregated column");
|
||||
}
|
||||
if (column.getType().isFloatLikeType()) {
|
||||
throw new AnalysisException("Floating point type column can not be partition column");
|
||||
}
|
||||
if (column.getType().isStringType()) {
|
||||
throw new AnalysisException("String Type should not be used in partition column["
|
||||
+ column.getName() + "].");
|
||||
}
|
||||
if (column.getType().isComplexType()) {
|
||||
throw new AnalysisException("Complex type column can't be partition column: "
|
||||
+ column.getType().toString());
|
||||
}
|
||||
// prohibit to create auto partition with null column anyhow
|
||||
if (this.isAutoPartition && column.isNullable()) {
|
||||
throw new AnalysisException("The auto partition column must be NOT NULL");
|
||||
}
|
||||
if (!ctx.getSessionVariable().isAllowPartitionColumnNullable() && column.isNullable()) {
|
||||
throw new AnalysisException(
|
||||
"The partition column must be NOT NULL with allow_partition_column_nullable OFF");
|
||||
}
|
||||
}
|
||||
|
||||
// if auto bucket auto bucket enable, rewrite distribution bucket num &&
|
||||
// set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
|
||||
private static Map<String, String> maybeRewriteByAutoBucket(
|
||||
@ -795,47 +688,7 @@ public class CreateTableInfo {
|
||||
* translate to catalog create table stmt
|
||||
*/
|
||||
public CreateTableStmt translateToLegacyStmt() {
|
||||
PartitionDesc partitionDesc = null;
|
||||
if (partitionColumns != null || isAutoPartition) {
|
||||
List<AllPartitionDesc> partitionDescs =
|
||||
partitions != null
|
||||
? partitions.stream().map(PartitionDefinition::translateToCatalogStyle)
|
||||
.collect(Collectors.toList())
|
||||
: null;
|
||||
|
||||
int createTablePartitionMaxNum = ConnectContext.get().getSessionVariable().getCreateTablePartitionMaxNum();
|
||||
if (partitionDescs != null && partitionDescs.size() > createTablePartitionMaxNum) {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException(String.format(
|
||||
"The number of partitions to be created is [%s], exceeding the maximum value of [%s]. "
|
||||
+ "Creating too many partitions can be time-consuming. If necessary, "
|
||||
+ "You can set the session variable 'create_table_partition_max_num' "
|
||||
+ "to a larger value.",
|
||||
partitionDescs.size(), createTablePartitionMaxNum));
|
||||
}
|
||||
|
||||
try {
|
||||
if (partitionType.equals(PartitionType.RANGE.name())) {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new RangePartitionDesc(
|
||||
convertToLegacyAutoPartitionExprs(autoPartitionExprs),
|
||||
partitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs);
|
||||
}
|
||||
} else {
|
||||
if (isAutoPartition) {
|
||||
partitionDesc = new ListPartitionDesc(
|
||||
convertToLegacyAutoPartitionExprs(autoPartitionExprs),
|
||||
partitionColumns, partitionDescs);
|
||||
} else {
|
||||
partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException(e.getMessage(), e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
PartitionDesc partitionDesc = partitionTableInfo.convertToPartitionDesc(isExternal);
|
||||
List<AlterClause> addRollups = Lists.newArrayList();
|
||||
if (!rollups.isEmpty()) {
|
||||
addRollups.addAll(rollups.stream().map(RollupDefinition::translateToCatalogStyle)
|
||||
@ -859,9 +712,13 @@ public class CreateTableInfo {
|
||||
throw new AnalysisException(e.getMessage(), e.getCause());
|
||||
}
|
||||
} else if (!engineName.equals("olap")) {
|
||||
if (partitionDesc != null || distributionDesc != null) {
|
||||
if (!engineName.equals("hive") && distributionDesc != null) {
|
||||
throw new AnalysisException("Create " + engineName
|
||||
+ " table should not contain partition or distribution desc");
|
||||
+ " table should not contain distribution desc");
|
||||
}
|
||||
if (!engineName.equals("hive") && !engineName.equals("iceberg") && partitionDesc != null) {
|
||||
throw new AnalysisException("Create " + engineName
|
||||
+ " table should not contain partition desc");
|
||||
}
|
||||
}
|
||||
|
||||
@ -872,31 +729,4 @@ public class CreateTableInfo {
|
||||
partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties,
|
||||
comment, addRollups, null);
|
||||
}
|
||||
|
||||
private static ArrayList<Expr> convertToLegacyAutoPartitionExprs(List<Expression> expressions) {
|
||||
return new ArrayList<>(expressions.stream().map(expression -> {
|
||||
if (expression instanceof UnboundSlot) {
|
||||
return new SlotRef(null, ((UnboundSlot) expression).getName());
|
||||
} else if (expression instanceof UnboundFunction) {
|
||||
UnboundFunction function = (UnboundFunction) expression;
|
||||
return new FunctionCallExpr(function.getName(),
|
||||
new FunctionParams(convertToLegacyArguments(function.children())));
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"unsupported auto partition expr " + expression.toString());
|
||||
}
|
||||
}).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private static List<Expr> convertToLegacyArguments(List<Expression> children) {
|
||||
return children.stream().map(child -> {
|
||||
if (child instanceof UnboundSlot) {
|
||||
return new SlotRef(null, ((UnboundSlot) child).getName());
|
||||
} else if (child instanceof Literal) {
|
||||
return new StringLiteral(((Literal) child).getStringValue());
|
||||
} else {
|
||||
throw new AnalysisException("unsupported argument " + child.toString());
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user