From d373ca7da16e572b93955aa121d8bc619655a6a2 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Mon, 9 Sep 2024 10:55:32 +0800 Subject: [PATCH] [enhancement](schema-change) Support light schema change on hash columns and agg key columns with varchar type to change length (#39319) (#40236) ## Proposed changes 1. Schema change should rebuild distribution info after modifying columns, especially distribution columns. Or it may cause dynamic partition failed when checking distribution columns' equality. 2. Support hash key columns to do light schema change. For unique key or dup key columns, could not be enabled temporarily due to some historical reasons. See #39798 . --- .../doris/alter/SchemaChangeHandler.java | 9 +-- .../apache/doris/alter/SchemaChangeJobV2.java | 1 + .../java/org/apache/doris/catalog/Column.java | 2 +- .../doris/catalog/HashDistributionInfo.java | 4 + .../org/apache/doris/catalog/OlapTable.java | 24 ++++++ ...amic_partition_mod_distribution_key.groovy | 75 +++++++++++++++++++ ...har_schema_change_with_distribution.groovy | 52 ------------- 7 files changed, 107 insertions(+), 60 deletions(-) create mode 100644 regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy delete mode 100644 regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 8caedae069..d9081fad01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -591,13 +591,7 @@ public class SchemaChangeHandler extends AlterHandler { if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR && modColumn.getDataType() == PrimitiveType.VARCHAR) { col.checkSchemaChangeAllowed(modColumn); - // If col and modColumn is not key, it allow light schema change, - // of course, olapTable has been enable light schema change - if (modColumn.isKey() || col.isKey()) { - lightSchemaChange = false; - } else { - lightSchemaChange = olapTable.getEnableLightSchemaChange(); - } + lightSchemaChange = olapTable.getEnableLightSchemaChange(); } if (col.isClusterKey()) { throw new DdlException("Can not modify cluster key column: " + col.getName()); @@ -2938,6 +2932,7 @@ public class SchemaChangeHandler extends AlterHandler { } olapTable.setIndexes(indexes); olapTable.rebuildFullSchema(); + olapTable.rebuildDistributionInfo(); } public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f11eab2d67..11a54905d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -679,6 +679,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } // rebuild table's full schema tbl.rebuildFullSchema(); + tbl.rebuildDistributionInfo(); // update bloom filter if (hasBfChange) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 80c5b739e0..5d22dd9510 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -674,7 +674,7 @@ public class Column implements Writable, GsonPostProcessable { } } - if (this.aggregationType != other.aggregationType) { + if (!Objects.equals(this.aggregationType, other.aggregationType)) { throw new DdlException("Can not change aggregation type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index b4dc9f85a0..43625096b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -180,4 +180,8 @@ public class HashDistributionInfo extends DistributionInfo { public RandomDistributionInfo toRandomDistributionInfo() { return new RandomDistributionInfo(bucketNum); } + + public void setDistributionColumns(List column) { + this.distributionColumns = column; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 9e2b0bca56..c7d87e3522 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -440,6 +440,30 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } } + public void rebuildDistributionInfo() { + if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) { + return; + } + HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo; + Set originalColumnsNames = + distributionInfo.getDistributionColumns() + .stream() + .map(Column::getName) + .collect(Collectors.toSet()); + + List newDistributionColumns = getBaseSchema() + .stream() + .filter(column -> originalColumnsNames.contains(column.getName())) + .map(Column::new) + .collect(Collectors.toList()); + distributionInfo.setDistributionColumns(newDistributionColumns); + + getPartitions() + .stream() + .map(Partition::getDistributionInfo) + .forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns)); + } + public boolean deleteIndexInfo(String indexName) { if (!indexNameToId.containsKey(indexName)) { return false; diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy new file mode 100644 index 0000000000..db44f59216 --- /dev/null +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_dynamic_partition_mod_distribution_key") { + def options = new ClusterOptions() + options.setFeNum(2) + docker(options) { + // FIXME: for historical bugs, this case will fail if adding k2 as dup key or unique key + // see in https://github.com/apache/doris/issues/39798 + def keys = ["DUPLICATE KEY (k1)", "UNIQUE KEY (k1)", "AGGREGATE KEY (k1, k2)"] + def aggTypes = ["", "", "REPLACE"] + for (i in 0..<3) { + def key = keys.get(i) + def aggType = aggTypes.get(i) + def tableName = "test_dynamic_partition_mod_distribution_key" + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k1 DATE NOT NULL, + k2 VARCHAR(20) NOT NULL, + v INT ${aggType} + ) ${key} + PARTITION BY RANGE(k1) () + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="1", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1") + """ + + sql """ alter table ${tableName} modify column k1 comment 'new_comment_for_k1' """ + sql """ alter table ${tableName} modify column k2 varchar(255) """ + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """ + sql """ alter table ${tableName} set('dynamic_partition.end'='5') """ + result = sql "show partitions from ${tableName}" + for (def retry = 0; retry < 10; retry++) { // at most wait 120s + if (result.size() == 9) { + break; + } + logger.info("wait dynamic partition scheduler, sleep 1s") + sleep(1000) // sleep 1s + result = sql "show partitions from ${tableName}" + } + assertEquals(9, result.size()) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy b/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy deleted file mode 100644 index 5068f0aec4..0000000000 --- a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite ("test_varchar_schema_change_with_distribution") { - def tableName = "test_varchar_schema_change_with_distribution" - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - - sql """ - CREATE TABLE ${tableName} - ( - dt datetime NOT NULL COMMENT 'εˆ†εŒΊζ—₯期', - citycode SMALLINT, - watts_range VARCHAR(20), - pv BIGINT SUM DEFAULT '0' - ) - AGGREGATE KEY(dt, citycode, watts_range) - PARTITION BY RANGE(dt)() - DISTRIBUTED BY HASH(dt, watts_range) BUCKETS 1 - PROPERTIES ( - "dynamic_partition.enable"="true", - "dynamic_partition.end"="3", - "dynamic_partition.buckets"="1", - "dynamic_partition.start"="-3", - "dynamic_partition.prefix"="p", - "dynamic_partition.time_unit"="HOUR", - "dynamic_partition.create_history_partition"="true", - "dynamic_partition.replication_allocation" = "tag.location.default: 1" - ); - """ - - test { - sql """ alter table ${tableName} modify column watts_range varchar(30) """ - exception "Can not modify distribution column" - } - - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - -} \ No newline at end of file