[Alter] Fix bug of assertion failure when submitting schema change job (#3181)
When creating a schema change job, we will create a corresponding shadow replica for each replica. Here we should check the state of the replica and only create replicas in the normal state. The process here may need to be modified later. We should completely allow users to submit alter jobs under any circumstances, and then in the job scheduling process, dynamically detect changes in the replicas and do replica repairs, instead of forcing a check on submission.
This commit is contained in:
@ -376,7 +376,7 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
baseReplica.getId(), baseTabletId, baseReplica.getState(), baseReplica.getLastFailedVersion());
|
||||
continue;
|
||||
}
|
||||
Preconditions.checkState(baseReplica.getState() == Replica.ReplicaState.NORMAL);
|
||||
Preconditions.checkState(baseReplica.getState() == Replica.ReplicaState.NORMAL, baseReplica.getState());
|
||||
// replica's init state is ALTER, so that tablet report process will ignore its report
|
||||
Replica mvReplica = new Replica(mvReplicaId, backendId, Replica.ReplicaState.ALTER,
|
||||
Partition.PARTITION_INIT_VERSION, Partition
|
||||
|
||||
@ -1082,6 +1082,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
long shadowIndexId = catalog.getNextId();
|
||||
|
||||
// create SHADOW index for each partition
|
||||
List<Tablet> addedTablets = Lists.newArrayList();
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
long partitionId = partition.getId();
|
||||
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
|
||||
@ -1089,24 +1090,54 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
MaterializedIndex shadowIndex = new MaterializedIndex(shadowIndexId, IndexState.SHADOW);
|
||||
MaterializedIndex originIndex = partition.getIndex(originIndexId);
|
||||
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, newSchemaHash, medium);
|
||||
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId);
|
||||
for (Tablet originTablet : originIndex.getTablets()) {
|
||||
long originTabletId = originTablet.getId();
|
||||
long shadowTabletId = catalog.getNextId();
|
||||
|
||||
Tablet shadowTablet = new Tablet(shadowTabletId);
|
||||
shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
|
||||
addedTablets.add(shadowTablet);
|
||||
|
||||
schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId);
|
||||
List<Replica> originReplicas = originTablet.getReplicas();
|
||||
|
||||
int healthyReplicaNum = 0;
|
||||
for (Replica originReplica : originReplicas) {
|
||||
long shadowReplicaId = catalog.getNextId();
|
||||
long backendId = originReplica.getBackendId();
|
||||
Preconditions.checkState(originReplica.getState() == ReplicaState.NORMAL);
|
||||
|
||||
if (originReplica.getState() == Replica.ReplicaState.CLONE
|
||||
|| originReplica.getState() == Replica.ReplicaState.DECOMMISSION
|
||||
|| originReplica.getLastFailedVersion() > 0) {
|
||||
LOG.info("origin replica {} of tablet {} state is {}, and last failed version is {}, skip creating shadow replica",
|
||||
originReplica.getId(), originReplica, originReplica.getState(), originReplica.getLastFailedVersion());
|
||||
continue;
|
||||
}
|
||||
Preconditions.checkState(originReplica.getState() == ReplicaState.NORMAL, originReplica.getState());
|
||||
// replica's init state is ALTER, so that tablet report process will ignore its report
|
||||
Replica shadowReplica = new Replica(shadowReplicaId, backendId, ReplicaState.ALTER,
|
||||
Partition.PARTITION_INIT_VERSION, Partition.PARTITION_INIT_VERSION_HASH,
|
||||
newSchemaHash);
|
||||
shadowTablet.addReplica(shadowReplica);
|
||||
healthyReplicaNum++;
|
||||
}
|
||||
|
||||
if (healthyReplicaNum < replicationNum / 2 + 1) {
|
||||
/*
|
||||
* TODO(cmy): This is a bad design.
|
||||
* Because in the schema change job, we will only send tasks to the shadow replicas that have been created,
|
||||
* without checking whether the quorum of replica number are satisfied.
|
||||
* This will cause the job to fail until we find that the quorum of replica number
|
||||
* is not satisfied until the entire job is done.
|
||||
* So here we check the replica number strictly and do not allow to submit the job
|
||||
* if the quorum of replica number is not satisfied.
|
||||
*/
|
||||
for (Tablet tablet : addedTablets) {
|
||||
Catalog.getCurrentInvertedIndex().deleteTablet(tablet.getId());
|
||||
}
|
||||
throw new DdlException(
|
||||
"tablet " + originTabletId + " has few healthy replica: " + healthyReplicaNum);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user