[bugfix](hive)Partition fields in wrong order for 2.1 (#35554)

bp #35322
This commit is contained in:
wuwenchi
2024-05-28 22:43:46 +08:00
committed by GitHub
parent 1fab4b63ec
commit 9fae08254d
4 changed files with 117 additions and 28 deletions

View File

@ -34,6 +34,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.FixedRangePartition;
import org.apache.doris.nereids.trees.plans.commands.info.InPartition;
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
@ -160,6 +161,8 @@ public class PartitionTableInfo {
* @param isEnableMergeOnWrite whether enable merge on write
*/
public void validatePartitionInfo(
String engineName,
List<ColumnDefinition> columns,
Map<String, ColumnDefinition> columnMap,
Map<String, String> properties,
ConnectContext ctx,
@ -191,6 +194,27 @@ public class PartitionTableInfo {
"Duplicated partition column " + duplicatesKeys.get(0));
}
if (engineName.equals(CreateTableInfo.ENGINE_HIVE)) {
// 1. Cannot set all columns as partitioning columns
// 2. The partition field must be at the end of the schema
// 3. The order of partition fields in the schema
// must be consistent with the order defined in `PARTITIONED BY LIST()`
if (partitionColumns.size() == columns.size()) {
throw new AnalysisException("Cannot set all columns as partitioning columns.");
}
List<ColumnDefinition> partitionInSchema = columns.subList(
columns.size() - partitionColumns.size(), columns.size());
for (int i = 0; i < partitionInSchema.size(); i++) {
if (!partitionColumns.contains(partitionInSchema.get(i).getName())) {
throw new AnalysisException("The partition field must be at the end of the schema.");
}
if (!partitionInSchema.get(i).getName().equals(partitionColumns.get(i))) {
throw new AnalysisException("The order of partition fields in the schema "
+ "must be consistent with the order defined in `PARTITIONED BY LIST()`");
}
}
}
if (partitionDefs != null) {
if (!checkPartitionsTypes()) {
throw new AnalysisException(

View File

@ -72,6 +72,16 @@ import java.util.stream.Collectors;
* table info in creating table.
*/
public class CreateTableInfo {
public static final String ENGINE_OLAP = "olap";
public static final String ENGINE_JDBC = "jdbc";
public static final String ENGINE_ELASTICSEARCH = "elasticsearch";
public static final String ENGINE_ODBC = "odbc";
public static final String ENGINE_MYSQL = "mysql";
public static final String ENGINE_BROKER = "broker";
public static final String ENGINE_HIVE = "hive";
public static final String ENGINE_ICEBERG = "iceberg";
private final boolean ifNotExists;
private String ctlName;
private String dbName;
@ -208,7 +218,7 @@ public class CreateTableInfo {
properties = Maps.newHashMap();
}
if (engineName.equalsIgnoreCase("olap")) {
if (engineName.equalsIgnoreCase(ENGINE_OLAP)) {
if (distribution == null) {
distribution = new DistributionDescriptor(false, true, FeConstants.default_bucket_num, null);
}
@ -221,7 +231,7 @@ public class CreateTableInfo {
throw new AnalysisException(e.getMessage(), e);
}
if (engineName.equals("olap")) {
if (engineName.equals(ENGINE_OLAP)) {
if (!ctlName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
throw new AnalysisException("Cannot create olap table out of internal catalog."
+ " Make sure 'engine' type is specified when use the catalog: " + ctlName);
@ -274,7 +284,7 @@ public class CreateTableInfo {
}
});
if (engineName.equalsIgnoreCase("olap")) {
if (engineName.equalsIgnoreCase(ENGINE_OLAP)) {
properties = PropertyAnalyzer.rewriteReplicaAllocationProperties(ctlName, dbName,
properties);
boolean enableDuplicateWithoutKeysByDefault = false;
@ -437,7 +447,8 @@ public class CreateTableInfo {
// validate partition
partitionTableInfo.extractPartitionColumns();
partitionTableInfo.validatePartitionInfo(columnMap, properties, ctx, isEnableMergeOnWrite, isExternal);
partitionTableInfo.validatePartitionInfo(
engineName, columns, columnMap, properties, ctx, isEnableMergeOnWrite, isExternal);
// validate distribution descriptor
distribution.updateCols(columns.get(0).getName());
@ -471,28 +482,29 @@ public class CreateTableInfo {
throw new AnalysisException(engineName + " catalog doesn't support rollup tables.");
}
if (engineName.equalsIgnoreCase("iceberg") && distribution != null) {
if (engineName.equalsIgnoreCase(ENGINE_ICEBERG) && distribution != null) {
throw new AnalysisException(
"Iceberg doesn't support 'DISTRIBUTE BY', "
+ "and you can use 'bucket(num, column)' in 'PARTITIONED BY'.");
}
for (ColumnDefinition columnDef : columns) {
if (!columnDef.isNullable()
&& engineName.equalsIgnoreCase("hive")) {
&& engineName.equalsIgnoreCase(ENGINE_HIVE)) {
throw new AnalysisException(engineName + " catalog doesn't support column with 'NOT NULL'.");
}
columnDef.setIsKey(true);
columnDef.setAggType(AggregateType.NONE);
}
// TODO: support iceberg partition check
if (engineName.equalsIgnoreCase("hive")) {
partitionTableInfo.validatePartitionInfo(columnMap, properties, ctx, false, true);
if (engineName.equalsIgnoreCase(ENGINE_HIVE)) {
partitionTableInfo.validatePartitionInfo(
engineName, columns, columnMap, properties, ctx, false, true);
}
}
// validate column
try {
if (!engineName.equals("elasticsearch") && columns.isEmpty()) {
if (!engineName.equals(ENGINE_ELASTICSEARCH) && columns.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
}
} catch (Exception e) {
@ -502,7 +514,7 @@ public class CreateTableInfo {
final boolean finalEnableMergeOnWrite = isEnableMergeOnWrite;
Set<String> keysSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
keysSet.addAll(keys);
columns.forEach(c -> c.validate(engineName.equals("olap"), keysSet, finalEnableMergeOnWrite,
columns.forEach(c -> c.validate(engineName.equals(ENGINE_OLAP), keysSet, finalEnableMergeOnWrite,
keysType));
// validate index
@ -512,7 +524,7 @@ public class CreateTableInfo {
for (IndexDefinition indexDef : indexes) {
indexDef.validate();
if (!engineName.equalsIgnoreCase("olap")) {
if (!engineName.equalsIgnoreCase(ENGINE_OLAP)) {
throw new AnalysisException(
"index only support in olap engine at current version.");
}
@ -553,11 +565,11 @@ public class CreateTableInfo {
}
if (catalog instanceof InternalCatalog) {
engineName = "olap";
engineName = ENGINE_OLAP;
} else if (catalog instanceof HMSExternalCatalog) {
engineName = "hive";
engineName = ENGINE_HIVE;
} else if (catalog instanceof IcebergExternalCatalog) {
engineName = "iceberg";
engineName = ENGINE_ICEBERG;
} else {
throw new AnalysisException("Current catalog does not support create table: " + ctlName);
}
@ -573,7 +585,7 @@ public class CreateTableInfo {
paddingEngineName(catalogName, ctx);
this.columns = Utils.copyRequiredMutableList(columns);
// bucket num is hard coded 10 to be consistent with legacy planner
if (engineName.equals("olap") && this.distribution == null) {
if (engineName.equals(ENGINE_OLAP) && this.distribution == null) {
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
throw new AnalysisException("Cannot create olap table out of internal catalog."
+ " Make sure 'engine' type is specified when use the catalog: " + catalogName);
@ -585,9 +597,9 @@ public class CreateTableInfo {
}
private void checkEngineName() {
if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker")
|| engineName.equals("elasticsearch") || engineName.equals("hive") || engineName.equals("iceberg")
|| engineName.equals("jdbc")) {
if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC) || engineName.equals(ENGINE_BROKER)
|| engineName.equals(ENGINE_ELASTICSEARCH) || engineName.equals(ENGINE_HIVE)
|| engineName.equals(ENGINE_ICEBERG) || engineName.equals(ENGINE_JDBC)) {
if (!isExternal) {
// this is for compatibility
isExternal = true;
@ -596,14 +608,14 @@ public class CreateTableInfo {
if (isExternal) {
throw new AnalysisException(
"Do not support external table with engine name = olap");
} else if (!engineName.equals("olap")) {
} else if (!engineName.equals(ENGINE_OLAP)) {
throw new AnalysisException(
"Do not support table with engine name = " + engineName);
}
}
if (!Config.enable_odbc_mysql_broker_table && (engineName.equals("odbc")
|| engineName.equals("mysql") || engineName.equals("broker"))) {
if (!Config.enable_odbc_mysql_broker_table && (engineName.equals(ENGINE_ODBC)
|| engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_BROKER))) {
throw new AnalysisException("odbc, mysql and broker table is no longer supported."
+ " For odbc and mysql external table, use jdbc table or jdbc catalog instead."
+ " For broker table, use table valued function instead."
@ -755,18 +767,18 @@ public class CreateTableInfo {
// TODO should move this code to validate function
// EsUtil.analyzePartitionAndDistributionDesc only accept DistributionDesc and PartitionDesc
if (engineName.equals("elasticsearch")) {
if (engineName.equals(ENGINE_ELASTICSEARCH)) {
try {
EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
} else if (!engineName.equals("olap")) {
if (!engineName.equals("hive") && distributionDesc != null) {
} else if (!engineName.equals(ENGINE_OLAP)) {
if (!engineName.equals(ENGINE_HIVE) && distributionDesc != null) {
throw new AnalysisException("Create " + engineName
+ " table should not contain distribution desc");
}
if (!engineName.equals("hive") && !engineName.equals("iceberg") && partitionDesc != null) {
if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_ICEBERG) && partitionDesc != null) {
throw new AnalysisException("Create " + engineName
+ " table should not contain partition desc");
}