ddl: support drop partition for label rules (#26795)
This commit is contained in:
@ -181,13 +181,15 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode
|
||||
}
|
||||
|
||||
// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
|
||||
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) []int64 {
|
||||
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string) {
|
||||
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
|
||||
partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions))
|
||||
for _, one := range tblInfo.Partition.AddingDefinitions {
|
||||
physicalTableIDs = append(physicalTableIDs, one.ID)
|
||||
partNames = append(partNames, one.Name.L)
|
||||
}
|
||||
tblInfo.Partition.AddingDefinitions = nil
|
||||
return physicalTableIDs
|
||||
return physicalTableIDs, partNames
|
||||
}
|
||||
|
||||
// checkAddPartitionValue values less than value must be strictly increasing for each partition.
|
||||
@ -924,6 +926,16 @@ func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func dropLabelRules(d *ddlCtx, schemaName, tableName string, partNames []string) error {
|
||||
deleteRules := make([]string, 0, len(partNames))
|
||||
for _, partName := range partNames {
|
||||
deleteRules = append(deleteRules, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, schemaName, tableName, partName))
|
||||
}
|
||||
// delete batch rules
|
||||
patch := label.NewRulePatch(nil, deleteRules)
|
||||
return infosync.UpdateLabelRules(context.TODO(), patch)
|
||||
}
|
||||
|
||||
// onDropTablePartition deletes old partition meta.
|
||||
func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
|
||||
var partNames []string
|
||||
@ -937,13 +949,19 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
|
||||
}
|
||||
var physicalTableIDs []int64
|
||||
if job.Type == model.ActionAddTablePartition {
|
||||
var pNames []string
|
||||
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
|
||||
physicalTableIDs = rollbackAddingPartitionInfo(tblInfo)
|
||||
physicalTableIDs, pNames = rollbackAddingPartitionInfo(tblInfo)
|
||||
err = dropRuleBundles(d, physicalTableIDs)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
}
|
||||
err = dropLabelRules(d, job.SchemaName, tblInfo.Name.L, pNames)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
|
||||
}
|
||||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -974,6 +992,11 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
}
|
||||
err = dropLabelRules(d, job.SchemaName, tblInfo.Name.L, partNames)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
|
||||
}
|
||||
job.SchemaState = model.StateDeleteOnly
|
||||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != job.SchemaState)
|
||||
case model.StateDeleteOnly:
|
||||
|
||||
Reference in New Issue
Block a user