[fix](alter inverted index) fix incorrect CreateTime of 'show alter' query result after fe restart (#17043)
For add or drop inverted index, when replay the logModifyTableAddOrDropInvertedIndices will new a schema change job, that has a new CreateTime, here should new a schema change job when not replay log.
This commit is contained in:
@ -2458,34 +2458,6 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
throw new DdlException("Nothing is changed. please check your alter stmt.");
|
||||
}
|
||||
|
||||
long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);
|
||||
|
||||
// after modify tablet meta, we will create a WAITING_TXN state schema change job v2
|
||||
// to handle alter inverted index task
|
||||
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(
|
||||
jobId, db.getId(), olapTable.getId(),
|
||||
olapTable.getName(), timeoutSecond * 1000);
|
||||
schemaChangeJob.setAlterInvertedIndexInfo(hasInvertedIndexChange, isDropInvertedIndex, alterInvertedIndexes);
|
||||
schemaChangeJob.setOriIndexInfo(oriIndexes);
|
||||
// only V2 support index, so if there is index changed, storage format must be V2
|
||||
schemaChangeJob.setStorageFormat(TStorageFormat.V2);
|
||||
|
||||
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
|
||||
long originIndexId = entry.getKey();
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
long partitionId = partition.getId();
|
||||
schemaChangeJob.addPartitionOriginIndexIdMap(partitionId, originIndexId);
|
||||
} // end for partition
|
||||
String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
|
||||
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
|
||||
// 1. get new schema version/schema version hash, short key column count
|
||||
int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
|
||||
int newSchemaVersion = currentSchemaVersion + 1;
|
||||
schemaChangeJob.addIndexSchema(originIndexId, originIndexId, newIndexName, newSchemaVersion,
|
||||
currentIndexMeta.getSchemaHash(),
|
||||
currentIndexMeta.getShortKeyColumnCount(), entry.getValue());
|
||||
} // end for index
|
||||
|
||||
//update base index schema
|
||||
updateBaseIndexSchema(olapTable, indexSchemaMap, indexes);
|
||||
|
||||
@ -2495,12 +2467,43 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
alterInvertedIndexes, isDropInvertedIndex, oriIndexes, jobId);
|
||||
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
|
||||
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
|
||||
|
||||
// after modify tablet meta, we will create a WAITING_TXN state schema change job v2
|
||||
// to handle alter inverted index task
|
||||
long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);
|
||||
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(
|
||||
jobId, db.getId(), olapTable.getId(),
|
||||
olapTable.getName(), timeoutSecond * 1000);
|
||||
schemaChangeJob.setAlterInvertedIndexInfo(hasInvertedIndexChange,
|
||||
isDropInvertedIndex, alterInvertedIndexes);
|
||||
schemaChangeJob.setOriIndexInfo(oriIndexes);
|
||||
// only V2 support index, so if there is index changed, storage format must be V2
|
||||
schemaChangeJob.setStorageFormat(TStorageFormat.V2);
|
||||
|
||||
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
|
||||
long originIndexId = entry.getKey();
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
long partitionId = partition.getId();
|
||||
schemaChangeJob.addPartitionOriginIndexIdMap(partitionId, originIndexId);
|
||||
} // end for partition
|
||||
String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
|
||||
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
|
||||
// 1. get new schema version/schema version hash, short key column count
|
||||
int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
|
||||
int newSchemaVersion = currentSchemaVersion + 1;
|
||||
schemaChangeJob.addIndexSchema(originIndexId, originIndexId, newIndexName, newSchemaVersion,
|
||||
currentIndexMeta.getSchemaHash(),
|
||||
currentIndexMeta.getShortKeyColumnCount(), entry.getValue());
|
||||
} // end for index
|
||||
|
||||
// set Job state then add job
|
||||
schemaChangeJob.setJobState(AlterJobV2.JobState.WAITING_TXN);
|
||||
this.addAlterJobV2(schemaChangeJob);
|
||||
LOG.debug("logAlterJob schemaChangeJob:{}", schemaChangeJob);
|
||||
Env.getCurrentEnv().getEditLog().logAlterJob(schemaChangeJob);
|
||||
}
|
||||
// set Job state then add job
|
||||
schemaChangeJob.setJobState(AlterJobV2.JobState.WAITING_TXN);
|
||||
this.addAlterJobV2(schemaChangeJob);
|
||||
LOG.info("finished modify table's meta for add or drop inverted index. table: {}, is replay: {}",
|
||||
olapTable.getName(), isReplay);
|
||||
LOG.info("finished modify table's meta for add or drop inverted index. table: {}, job: {}, is replay: {}",
|
||||
olapTable.getName(), jobId, isReplay);
|
||||
}
|
||||
|
||||
public void replaymodifyTableAddOrDropInvertedIndices(
|
||||
|
||||
@ -927,7 +927,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
}
|
||||
jobState = JobState.FINISHED;
|
||||
this.finishedTimeMs = replayedJob.finishedTimeMs;
|
||||
LOG.info("replay finished schema change job: {}", jobId);
|
||||
LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user