diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 8caedae069..d9081fad01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -591,13 +591,7 @@ public class SchemaChangeHandler extends AlterHandler { if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR && modColumn.getDataType() == PrimitiveType.VARCHAR) { col.checkSchemaChangeAllowed(modColumn); - // If col and modColumn is not key, it allow light schema change, - // of course, olapTable has been enable light schema change - if (modColumn.isKey() || col.isKey()) { - lightSchemaChange = false; - } else { - lightSchemaChange = olapTable.getEnableLightSchemaChange(); - } + lightSchemaChange = olapTable.getEnableLightSchemaChange(); } if (col.isClusterKey()) { throw new DdlException("Can not modify cluster key column: " + col.getName()); @@ -2938,6 +2932,7 @@ public class SchemaChangeHandler extends AlterHandler { } olapTable.setIndexes(indexes); olapTable.rebuildFullSchema(); + olapTable.rebuildDistributionInfo(); } public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f11eab2d67..11a54905d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -679,6 +679,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } // rebuild table's full schema tbl.rebuildFullSchema(); + tbl.rebuildDistributionInfo(); // update bloom filter if (hasBfChange) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 80c5b739e0..5d22dd9510 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -674,7 +674,7 @@ public class Column implements Writable, GsonPostProcessable { } } - if (this.aggregationType != other.aggregationType) { + if (!Objects.equals(this.aggregationType, other.aggregationType)) { throw new DdlException("Can not change aggregation type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index b4dc9f85a0..43625096b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -180,4 +180,8 @@ public class HashDistributionInfo extends DistributionInfo { public RandomDistributionInfo toRandomDistributionInfo() { return new RandomDistributionInfo(bucketNum); } + + public void setDistributionColumns(List column) { + this.distributionColumns = column; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 9e2b0bca56..c7d87e3522 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -440,6 +440,30 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } } + public void rebuildDistributionInfo() { + if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) { + return; + } + HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo; + Set originalColumnsNames = + distributionInfo.getDistributionColumns() + .stream() + .map(Column::getName) + .collect(Collectors.toSet()); + + List newDistributionColumns = getBaseSchema() + .stream() + .filter(column -> originalColumnsNames.contains(column.getName())) + .map(Column::new) + .collect(Collectors.toList()); + distributionInfo.setDistributionColumns(newDistributionColumns); + + getPartitions() + .stream() + .map(Partition::getDistributionInfo) + .forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns)); + } + public boolean deleteIndexInfo(String indexName) { if (!indexNameToId.containsKey(indexName)) { return false; diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy new file mode 100644 index 0000000000..db44f59216 --- /dev/null +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_dynamic_partition_mod_distribution_key") { + def options = new ClusterOptions() + options.setFeNum(2) + docker(options) { + // FIXME: for historical bugs, this case will fail if adding k2 as dup key or unique key + // see in https://github.com/apache/doris/issues/39798 + def keys = ["DUPLICATE KEY (k1)", "UNIQUE KEY (k1)", "AGGREGATE KEY (k1, k2)"] + def aggTypes = ["", "", "REPLACE"] + for (i in 0..<3) { + def key = keys.get(i) + def aggType = aggTypes.get(i) + def tableName = "test_dynamic_partition_mod_distribution_key" + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k1 DATE NOT NULL, + k2 VARCHAR(20) NOT NULL, + v INT ${aggType} + ) ${key} + PARTITION BY RANGE(k1) () + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="1", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1") + """ + + sql """ alter table ${tableName} modify column k1 comment 'new_comment_for_k1' """ + sql """ alter table ${tableName} modify column k2 varchar(255) """ + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """ + sql """ alter table ${tableName} set('dynamic_partition.end'='5') """ + result = sql "show partitions from ${tableName}" + for (def retry = 0; retry < 10; retry++) { // at most wait 120s + if (result.size() == 9) { + break; + } + logger.info("wait dynamic partition scheduler, sleep 1s") + sleep(1000) // sleep 1s + result = sql "show partitions from ${tableName}" + } + assertEquals(9, result.size()) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy b/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy deleted file mode 100644 index 5068f0aec4..0000000000 --- a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite ("test_varchar_schema_change_with_distribution") { - def tableName = "test_varchar_schema_change_with_distribution" - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - - sql """ - CREATE TABLE ${tableName} - ( - dt datetime NOT NULL COMMENT 'εˆ†εŒΊζ—₯期', - citycode SMALLINT, - watts_range VARCHAR(20), - pv BIGINT SUM DEFAULT '0' - ) - AGGREGATE KEY(dt, citycode, watts_range) - PARTITION BY RANGE(dt)() - DISTRIBUTED BY HASH(dt, watts_range) BUCKETS 1 - PROPERTIES ( - "dynamic_partition.enable"="true", - "dynamic_partition.end"="3", - "dynamic_partition.buckets"="1", - "dynamic_partition.start"="-3", - "dynamic_partition.prefix"="p", - "dynamic_partition.time_unit"="HOUR", - "dynamic_partition.create_history_partition"="true", - "dynamic_partition.replication_allocation" = "tag.location.default: 1" - ); - """ - - test { - sql """ alter table ${tableName} modify column watts_range varchar(30) """ - exception "Can not modify distribution column" - } - - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - -} \ No newline at end of file