[enhancement](schema-change) Support light schema change on hash columns and agg key columns with varchar type to change length (#39319) (#40236)

## Proposed changes

1. Schema change should rebuild distribution info after modifying
columns, especially distribution columns. Or it may cause dynamic
partition failed when checking distribution columns' equality.
2. Support hash key columns to do light schema change. For unique key or
dup key columns, could not be enabled temporarily due to some historical
reasons. See #39798 .
This commit is contained in:
Siyang Tang
2024-09-09 10:55:32 +08:00
committed by GitHub
parent e0b22b5104
commit d373ca7da1
7 changed files with 107 additions and 60 deletions

View File

@ -591,13 +591,7 @@ public class SchemaChangeHandler extends AlterHandler {
if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR
&& modColumn.getDataType() == PrimitiveType.VARCHAR) {
col.checkSchemaChangeAllowed(modColumn);
// If col and modColumn is not key, it allow light schema change,
// of course, olapTable has been enable light schema change
if (modColumn.isKey() || col.isKey()) {
lightSchemaChange = false;
} else {
lightSchemaChange = olapTable.getEnableLightSchemaChange();
}
lightSchemaChange = olapTable.getEnableLightSchemaChange();
}
if (col.isClusterKey()) {
throw new DdlException("Can not modify cluster key column: " + col.getName());
@ -2938,6 +2932,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
olapTable.setIndexes(indexes);
olapTable.rebuildFullSchema();
olapTable.rebuildDistributionInfo();
}
public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info)

View File

@ -679,6 +679,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
// rebuild table's full schema
tbl.rebuildFullSchema();
tbl.rebuildDistributionInfo();
// update bloom filter
if (hasBfChange) {

View File

@ -674,7 +674,7 @@ public class Column implements Writable, GsonPostProcessable {
}
}
if (this.aggregationType != other.aggregationType) {
if (!Objects.equals(this.aggregationType, other.aggregationType)) {
throw new DdlException("Can not change aggregation type");
}

View File

@ -180,4 +180,8 @@ public class HashDistributionInfo extends DistributionInfo {
public RandomDistributionInfo toRandomDistributionInfo() {
return new RandomDistributionInfo(bucketNum);
}
public void setDistributionColumns(List<Column> column) {
this.distributionColumns = column;
}
}

View File

@ -440,6 +440,30 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
}
public void rebuildDistributionInfo() {
if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) {
return;
}
HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo;
Set<String> originalColumnsNames =
distributionInfo.getDistributionColumns()
.stream()
.map(Column::getName)
.collect(Collectors.toSet());
List<Column> newDistributionColumns = getBaseSchema()
.stream()
.filter(column -> originalColumnsNames.contains(column.getName()))
.map(Column::new)
.collect(Collectors.toList());
distributionInfo.setDistributionColumns(newDistributionColumns);
getPartitions()
.stream()
.map(Partition::getDistributionInfo)
.forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns));
}
public boolean deleteIndexInfo(String indexName) {
if (!indexNameToId.containsKey(indexName)) {
return false;