[fix](alter) Fix bug that partition column of a unique key table can be modified (#7217)

The partition columns can not be modified.
This commit is contained in:
Mingyu Chen
2021-11-26 10:16:01 +08:00
committed by GitHub
parent 948a2a738d
commit baa5d6089f
3 changed files with 94 additions and 65 deletions

View File

@ -294,7 +294,7 @@ public class MaterializedViewHandler extends AlterHandler {
// remove tablet which has already inserted into TabletInvertedIndex
TabletInvertedIndex tabletInvertedIndex = Catalog.getCurrentInvertedIndex();
for (RollupJobV2 rollupJobV2 : rollupNameJobMap.values()) {
for(MaterializedIndex index : rollupJobV2.getPartitionIdToRollupIndex().values()) {
for (MaterializedIndex index : rollupJobV2.getPartitionIdToRollupIndex().values()) {
for (Tablet tablet : index.getTablets()) {
tabletInvertedIndex.deleteTablet(tablet.getId());
}
@ -321,8 +321,9 @@ public class MaterializedViewHandler extends AlterHandler {
* @throws AnalysisException
*/
private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName,
List<Column> mvColumns, Map<String, String> properties, OlapTable
olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt)
List<Column> mvColumns, Map<String, String> properties,
OlapTable olapTable, Database db, long baseIndexId, KeysType mvKeysType,
OriginStatement origStmt)
throws DdlException, AnalysisException {
if (mvKeysType == null) {
// assign rollup index's key type, same as base index's
@ -343,9 +344,9 @@ public class MaterializedViewHandler extends AlterHandler {
long jobId = catalog.getNextId();
long mvIndexId = catalog.getNextId();
RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs,
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount, origStmt);
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount, origStmt);
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
if (mvName.equals(newStorageFormatIndexName)) {
mvJob.setStorageFormat(TStorageFormat.V2);
@ -392,9 +393,9 @@ public class MaterializedViewHandler extends AlterHandler {
Preconditions.checkState(baseReplica.getState() == Replica.ReplicaState.NORMAL, baseReplica.getState());
// replica's init state is ALTER, so that tablet report process will ignore its report
Replica mvReplica = new Replica(mvReplicaId, backendId, Replica.ReplicaState.ALTER,
Partition.PARTITION_INIT_VERSION, Partition
.PARTITION_INIT_VERSION_HASH,
mvSchemaHash);
Partition.PARTITION_INIT_VERSION, Partition
.PARTITION_INIT_VERSION_HASH,
mvSchemaHash);
newTablet.addReplica(mvReplica);
healthyReplicaNum++;
} // end for baseReplica
@ -419,7 +420,7 @@ public class MaterializedViewHandler extends AlterHandler {
mvJob.addMVIndex(partitionId, mvIndex);
LOG.debug("create materialized view index {} based on index {} in partition {}",
mvIndexId, baseIndexId, partitionId);
mvIndexId, baseIndexId, partitionId);
} // end for partitions
LOG.info("finished to create materialized view job: {}", mvJob.getJobId());
@ -482,7 +483,7 @@ public class MaterializedViewHandler extends AlterHandler {
if (partitionOrDistributedColumnName.contains(mvColumnItem.getBaseColumnName().toLowerCase())
&& mvColumnItem.getAggregationType() != null) {
throw new DdlException("The partition and distributed columns " + mvColumnItem.getBaseColumnName()
+ " must be key column in mv");
+ " must be key column in mv");
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
@ -498,13 +499,15 @@ public class MaterializedViewHandler extends AlterHandler {
public List<Column> checkAndPrepareMaterializedView(AddRollupClause addRollupClause, OlapTable olapTable,
long baseIndexId, boolean changeStorageFormat)
throws DdlException{
throws DdlException {
String rollupIndexName = addRollupClause.getRollupName();
List<String> rollupColumnNames = addRollupClause.getColumnNames();
if (changeStorageFormat) {
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
rollupIndexName = newStorageFormatIndexName;
List<Column> columns = olapTable.getSchemaByIndexId(baseIndexId);
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> columns = olapTable.getSchemaByIndexId(baseIndexId, true);
// create the same schema as base table
rollupColumnNames.clear();
for (Column column : columns) {
@ -628,17 +631,17 @@ public class MaterializedViewHandler extends AlterHandler {
} else {
/*
* eg.
* Base Table's schema is (k1,k2,k3,k4,k5) dup key (k1,k2,k3).
* The following rollup is allowed:
* 1. (k1) dup key (k1)
* 2. (k2,k3) dup key (k2)
* 3. (k1,k2,k3) dup key (k1,k2)
*
* The following rollup is forbidden:
* 1. (k1) dup key (k2)
* 2. (k2,k3) dup key (k3,k2)
* 3. (k1,k2,k3) dup key (k2,k3)
*/
* Base Table's schema is (k1,k2,k3,k4,k5) dup key (k1,k2,k3).
* The following rollup is allowed:
* 1. (k1) dup key (k1)
* 2. (k2,k3) dup key (k2)
* 3. (k1,k2,k3) dup key (k1,k2)
*
* The following rollup is forbidden:
* 1. (k1) dup key (k2)
* 2. (k2,k3) dup key (k3,k2)
* 3. (k1,k2,k3) dup key (k2,k3)
*/
// user specify the duplicate keys for rollup index
List<String> dupKeys = addRollupClause.getDupKeys();
if (dupKeys.size() > rollupColumnNames.size()) {
@ -743,7 +746,7 @@ public class MaterializedViewHandler extends AlterHandler {
}
public void processDropMaterializedView(DropMaterializedViewStmt dropMaterializedViewStmt, Database db,
OlapTable olapTable) throws DdlException, MetaNotFoundException {
OlapTable olapTable) throws DdlException, MetaNotFoundException {
olapTable.writeLock();
try {
// check table state
@ -1019,7 +1022,7 @@ public class MaterializedViewHandler extends AlterHandler {
// cancel rollup
cancelledJobs.add(rollupJob);
LOG.warn("cancel rollup[{}] cause bad rollup job[{}]",
((RollupJob) rollupJob).getRollupIndexName(), rollupJob.getTableId());
((RollupJob) rollupJob).getRollupIndexName(), rollupJob.getTableId());
}
}
break;
@ -1173,7 +1176,7 @@ public class MaterializedViewHandler extends AlterHandler {
if (alterClauseOptional.isPresent()) {
if (alterClauseOptional.get() instanceof AddRollupClause) {
processBatchAddRollup(alterClauses, db, olapTable);
} else if (alterClauseOptional.get() instanceof DropRollupClause) {
} else if (alterClauseOptional.get() instanceof DropRollupClause) {
processBatchDropRollup(alterClauses, db, olapTable);
} else {
Preconditions.checkState(false);

View File

@ -143,7 +143,7 @@ public class SchemaChangeHandler extends AlterHandler {
Set<String> newColNameSet = Sets.newHashSet(column.getName());
addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId,
indexSchemaMap, newColNameSet);
indexSchemaMap, newColNameSet);
}
private void processAddColumn(AddColumnClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
@ -167,7 +167,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
private void processAddColumns(AddColumnsClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
List<Column> columns = alterClause.getColumns();
String targetIndexName = alterClause.getRollupName();
checkIndexExists(olapTable, targetIndexName);
@ -218,14 +218,14 @@ public class SchemaChangeHandler extends AlterHandler {
}
private void processDropColumn(DropColumnClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes) throws DdlException {
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes) throws DdlException {
String dropColName = alterClause.getColName();
String targetIndexName = alterClause.getRollupName();
checkIndexExists(olapTable, targetIndexName);
String baseIndexName = olapTable.getName();
checkAssignedTargetIndexName(baseIndexName, targetIndexName);
/*
* UNIQUE:
* Can not drop any key column.
@ -242,11 +242,11 @@ public class SchemaChangeHandler extends AlterHandler {
break;
}
}
if (isKey) {
throw new DdlException("Can not drop key column in Unique data model table");
}
} else if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
if (null == targetIndexName) {
// drop column in base table
@ -287,7 +287,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
Iterator<Index> it = indexes.iterator();
while(it.hasNext()){
while (it.hasNext()) {
Index index = it.next();
for (String indexCol : index.getColumns()) {
if (dropColName.equalsIgnoreCase(indexCol)) {
@ -422,9 +422,10 @@ public class SchemaChangeHandler extends AlterHandler {
newSchema.set(modColIndex, modColumn);
}
}
// User can modify column type and column position
private void processModifyColumn(ModifyColumnClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
Column modColumn = alterClause.getColumn();
if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
if (modColumn.isKey() && null != modColumn.getAggregationType()) {
@ -576,7 +577,7 @@ public class SchemaChangeHandler extends AlterHandler {
break;
}
}
Preconditions.checkState(modColIndex != -1);
// replace the old column
Column oldCol = otherIndexSchema.get(modColIndex);
@ -599,9 +600,9 @@ public class SchemaChangeHandler extends AlterHandler {
* a prefix in the name of these modified columns.
* This prefix only exist during the schema change process. Once the schema change is finished,
* it will be removed.
*
*
* After adding this prefix, modify a column is just same as 'add' a column.
*
*
* And if the column type is not changed, the same column name is still to the same column type,
* so no need to add prefix.
*/
@ -642,7 +643,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
private void processReorderColumn(ReorderColumnsClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
List<String> orderedColNames = alterClause.getColumnsByPos();
String targetIndexName = alterClause.getRollupName();
checkIndexExists(olapTable, targetIndexName);
@ -757,17 +758,17 @@ public class SchemaChangeHandler extends AlterHandler {
long targetIndexId, long baseIndexId,
Map<Long, LinkedList<Column>> indexSchemaMap,
Set<String> newColNameSet) throws DdlException {
String newColName = newColumn.getName();
// check the validation of aggregation method on column.
// also fill the default aggregation method if not specified.
if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
if (newColumn.isKey() && newColumn.getAggregationType() != null) {
throw new DdlException("Can not assign aggregation method on key column: " + newColName);
throw new DdlException("Can not assign aggregation method on key column: " + newColName);
} else if (null == newColumn.getAggregationType()) {
newColumn.setIsKey(true);
} else if (newColumn.getAggregationType() == AggregateType.SUM
&& newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) {
&& newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) {
throw new DdlException("The default value of '" + newColName + "' with SUM aggregation function must be zero");
}
} else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
@ -803,7 +804,7 @@ public class SchemaChangeHandler extends AlterHandler {
// do not support adding new column which already exist in base schema.
List<Column> baseSchema = olapTable.getBaseSchema(true);
boolean found = false;
for (Column column : baseSchema) {
for (Column column : baseSchema) {
if (column.getName().equalsIgnoreCase(newColName)) {
found = true;
break;
@ -815,10 +816,10 @@ public class SchemaChangeHandler extends AlterHandler {
} else if (newColName.equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new DdlException("Can not enable sequence column support, already supported sequence column.");
} else {
throw new DdlException("Can not add column which already exists in base table: " + newColName);
throw new DdlException("Can not add column which already exists in base table: " + newColName);
}
}
/*
* add new column to indexes.
* UNIQUE:
@ -852,7 +853,7 @@ public class SchemaChangeHandler extends AlterHandler {
// 2. add to rollup
modIndexSchema = indexSchemaMap.get(targetIndexId);
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false);
}
}
} else if (KeysType.DUP_KEYS == olapTable.getKeysType()) {
if (targetIndexId == -1L) {
// add to base index
@ -867,7 +868,7 @@ public class SchemaChangeHandler extends AlterHandler {
if (newColumn.isKey()) {
/*
* if add column in rollup is key,
* if add column in rollup is key,
* then put the column in base table as the last key column
*/
modIndexSchema = indexSchemaMap.get(baseIndexId);
@ -882,7 +883,7 @@ public class SchemaChangeHandler extends AlterHandler {
// 1. add to base index first
List<Column> modIndexSchema = indexSchemaMap.get(baseIndexId);
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true);
if (targetIndexId == -1L) {
// no specified target index. return
return;
@ -904,7 +905,7 @@ public class SchemaChangeHandler extends AlterHandler {
* So that k1 will be added to base index 'twice', and we just ignore this repeat adding.
*/
private void checkAndAddColumn(List<Column> modIndexSchema, Column newColumn, ColumnPosition columnPos,
Set<String> newColNameSet, boolean isBaseIndex) throws DdlException {
Set<String> newColNameSet, boolean isBaseIndex) throws DdlException {
int posIndex = -1;
int lastVisibleIdx = -1;
String newColName = newColumn.getName();
@ -1040,7 +1041,7 @@ public class SchemaChangeHandler extends AlterHandler {
double bfFpp = 0;
try {
bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(propertyMap,
indexSchemaMap.get(olapTable.getBaseIndexId()), olapTable.getKeysType());
indexSchemaMap.get(olapTable.getBaseIndexId()), olapTable.getKeysType());
bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(propertyMap);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
@ -1095,7 +1096,7 @@ public class SchemaChangeHandler extends AlterHandler {
if (bfColumns == null) {
bfFpp = 0;
}
// property 3: timeout
long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);
@ -1123,7 +1124,9 @@ public class SchemaChangeHandler extends AlterHandler {
Map<Long, Short> indexIdToShortKeyColumnCount = Maps.newHashMap();
Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
for (Long alterIndexId : indexSchemaMap.keySet()) {
List<Column> originSchema = olapTable.getSchemaByIndexId(alterIndexId);
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> originSchema = olapTable.getSchemaByIndexId(alterIndexId, true);
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
Set<Column> needAlterColumns = Sets.newHashSet();
@ -1276,7 +1279,7 @@ public class SchemaChangeHandler extends AlterHandler {
// 5. calc short key
short newShortKeyColumnCount = Catalog.calcShortKeyColumnCount(alterSchema,
indexIdToProperties.get(alterIndexId));
indexIdToProperties.get(alterIndexId));
LOG.debug("alter index[{}] short key column count: {}", alterIndexId, newShortKeyColumnCount);
indexIdToShortKeyColumnCount.put(alterIndexId, newShortKeyColumnCount);
@ -1341,7 +1344,7 @@ public class SchemaChangeHandler extends AlterHandler {
for (Replica originReplica : originReplicas) {
long shadowReplicaId = catalog.getNextId();
long backendId = originReplica.getBackendId();
if (originReplica.getState() == Replica.ReplicaState.CLONE
|| originReplica.getState() == Replica.ReplicaState.DECOMMISSION
|| originReplica.getLastFailedVersion() > 0) {
@ -1375,12 +1378,12 @@ public class SchemaChangeHandler extends AlterHandler {
"tablet " + originTabletId + " has few healthy replica: " + healthyReplicaNum);
}
}
schemaChangeJob.addPartitionShadowIndex(partitionId, shadowIndexId, shadowIndex);
} // end for partition
schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, newIndexName, newSchemaVersion, newSchemaHash, newShortKeyColumnCount, entry.getValue());
} // end for index
// set table state
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
@ -1433,8 +1436,8 @@ public class SchemaChangeHandler extends AlterHandler {
for (AlterJob alterJob : alterJobs.values()) {
SchemaChangeJob schemaChangeJob = (SchemaChangeJob) alterJob;
if (schemaChangeJob.getState() != JobState.FINISHING
&& schemaChangeJob.getState() != JobState.FINISHED
if (schemaChangeJob.getState() != JobState.FINISHING
&& schemaChangeJob.getState() != JobState.FINISHED
&& schemaChangeJob.getState() != JobState.CANCELLED) {
// cancel the old alter table job
cancelledJobs.add(schemaChangeJob);
@ -1475,13 +1478,13 @@ public class SchemaChangeHandler extends AlterHandler {
int res = schemaChangeJob.checkOrResendClearTasks();
if (res != 0) {
if (res == -1) {
LOG.warn("schema change job is in finishing state,but could not finished, "
LOG.warn("schema change job is in finishing state,but could not finished, "
+ "just finish it, maybe a fatal error {}", alterJob);
} else {
LOG.info("send clear tasks to all be for job [{}] successfully, "
+ "set status to finished", alterJob);
}
finishedJobs.add(alterJob);
}
} else {
@ -1793,7 +1796,7 @@ public class SchemaChangeHandler extends AlterHandler {
return;
}
for(Partition partition: partitions) {
for (Partition partition : partitions) {
updatePartitionInMemoryMeta(db, olapTable.getName(), partition.getName(), isInMemory);
}
@ -1818,7 +1821,7 @@ public class SchemaChangeHandler extends AlterHandler {
return;
}
for(String partitionName : partitionNames) {
for (String partitionName : partitionNames) {
try {
updatePartitionInMemoryMeta(db, olapTable.getName(), partitionName, isInMemory);
} catch (Exception e) {
@ -1865,10 +1868,10 @@ public class SchemaChangeHandler extends AlterHandler {
int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum);
AgentBatchTask batchTask = new AgentBatchTask();
for(Map.Entry<Long, Set<Pair<Long, Integer>>> kv: beIdToTabletIdWithHash.entrySet()) {
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
countDownLatch.addMark(kv.getKey(), kv.getValue());
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(),
isInMemory, countDownLatch);
isInMemory, countDownLatch);
batchTask.addTask(task);
}
if (!FeConstants.runningUnitTest) {
@ -2014,7 +2017,7 @@ public class SchemaChangeHandler extends AlterHandler {
Iterator<Index> itr = indexes.iterator();
while (itr.hasNext()) {
Index idx = itr.next();
Index idx = itr.next();
if (idx.getIndexName().equalsIgnoreCase(alterClause.getIndexName())) {
itr.remove();
break;

View File

@ -727,6 +727,29 @@ public class AlterTest {
alterTable(changeOrderStmt, false);
}
@Test
public void testAlterUniqueTablePartitionColumn() throws Exception {
createTable("CREATE TABLE test.unique_partition\n" +
"(\n" +
" k1 date,\n" +
" k2 int,\n" +
" v1 int\n" +
")\n" +
"UNIQUE KEY(k1, k2)\n" +
"PARTITION BY RANGE(k1)\n" +
"(\n" +
" PARTITION p1 values less than('2020-02-01'),\n" +
" PARTITION p2 values less than('2020-03-01')\n" +
")\n" +
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
"PROPERTIES('replication_num' = '1');");
// partition key can not be changed.
// this test is also for validating a bug fix about invisible columns(delete flag column)
String changeOrderStmt = "ALTER TABLE test.unique_partition modify column k1 int key null";
alterTable(changeOrderStmt, true);
}
private boolean checkAllTabletsExists(List<Long> tabletIds) {
TabletInvertedIndex invertedIndex = Catalog.getCurrentCatalog().getTabletInvertedIndex();
for (long tabletId : tabletIds) {