ddl: use job args to pass runtime info for truncate/drop/recover table/partition (#56632)
ref pingcap/tidb#54436
This commit is contained in:
@ -404,15 +404,10 @@ func checkTableHasForeignKeyReferred(is infoschemactx.MetaOnlyInfoSchema, schema
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job) error {
|
||||
func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job, args *model.DropTableArgs) error {
|
||||
if !variable.EnableForeignKey.Load() {
|
||||
return nil
|
||||
}
|
||||
args, err := model.GetDropTableArgs(job)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return errors.Trace(err)
|
||||
}
|
||||
objectIdents, fkCheck := args.Identifiers, args.FKCheck
|
||||
referredFK, err := checkTableHasForeignKeyReferredInOwner(infoCache, job.SchemaName, job.TableName, objectIdents, fkCheck)
|
||||
if err != nil {
|
||||
|
||||
@ -2236,6 +2236,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
jobCtx.jobArgs = args
|
||||
partNames := args.PartNames
|
||||
metaMut := jobCtx.metaMut
|
||||
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
|
||||
@ -2378,7 +2379,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
|
||||
tblInfo.Partition.DDLState = model.StateNone
|
||||
tblInfo.Partition.DDLAction = model.ActionNone
|
||||
// used by ApplyDiff in updateSchemaVersion
|
||||
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
|
||||
args.OldPhysicalTblIDs = physicalTableIDs
|
||||
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -2391,7 +2392,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
|
||||
)
|
||||
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
|
||||
// A background job will be created to delete old partition data.
|
||||
args.OldPhysicalTblIDs = physicalTableIDs
|
||||
job.FillFinishedArgs(args)
|
||||
default:
|
||||
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
|
||||
@ -2424,6 +2424,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
jobCtx.jobArgs = args
|
||||
oldIDs, newIDs := args.OldPartitionIDs, args.NewPartitionIDs
|
||||
if len(oldIDs) != len(newIDs) {
|
||||
job.State = model.JobStateCancelled
|
||||
@ -2471,7 +2472,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
|
||||
|
||||
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)
|
||||
|
||||
job.CtxVars = []any{oldIDs, newIDs}
|
||||
args.ShouldUpdateAffectedPartitions = true
|
||||
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -2613,7 +2614,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
|
||||
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)
|
||||
|
||||
// used by ApplyDiff in updateSchemaVersion
|
||||
job.CtxVars = []any{oldIDs, newIDs}
|
||||
args.ShouldUpdateAffectedPartitions = true
|
||||
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
|
||||
@ -48,26 +48,15 @@ func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error
|
||||
}
|
||||
|
||||
// SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable.
|
||||
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error {
|
||||
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error {
|
||||
// Truncate table has two table ID, should be handled differently.
|
||||
args, err := model.GetTruncateTableArgs(job)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
|
||||
diff.TableID = args.NewTableID
|
||||
diff.OldTableID = job.TableID
|
||||
|
||||
// affects are used to update placement rule cache
|
||||
if job.Version == model.JobVersion1 {
|
||||
if len(job.CtxVars) > 0 {
|
||||
oldIDs := job.CtxVars[0].([]int64)
|
||||
newIDs := job.CtxVars[1].([]int64)
|
||||
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
|
||||
}
|
||||
} else {
|
||||
if len(args.OldPartIDsWithPolicy) > 0 {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy)
|
||||
}
|
||||
if len(args.OldPartIDsWithPolicy) > 0 {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -170,23 +159,41 @@ func SetSchemaDiffForExchangeTablePartition(diff *model.SchemaDiff, job *model.J
|
||||
}
|
||||
|
||||
// SetSchemaDiffForTruncateTablePartition set SchemaDiff for ActionTruncateTablePartition.
|
||||
func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job) {
|
||||
func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
|
||||
diff.TableID = job.TableID
|
||||
if len(job.CtxVars) > 0 {
|
||||
oldIDs := job.CtxVars[0].([]int64)
|
||||
newIDs := job.CtxVars[1].([]int64)
|
||||
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
|
||||
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
|
||||
if args.ShouldUpdateAffectedPartitions {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.NewPartitionIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTablePartition, ActionRecoverTable, ActionDropTable.
|
||||
func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job) {
|
||||
// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTable.
|
||||
func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
|
||||
// affects are used to update placement rule cache
|
||||
diff.TableID = job.TableID
|
||||
if len(job.CtxVars) > 0 {
|
||||
if oldIDs, ok := job.CtxVars[0].([]int64); ok {
|
||||
diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs)
|
||||
}
|
||||
args := jobCtx.jobArgs.(*model.DropTableArgs)
|
||||
if len(args.OldPartitionIDs) > 0 {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.OldPartitionIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSchemaDiffForDropTablePartition set SchemaDiff for ActionDropTablePartition.
|
||||
func SetSchemaDiffForDropTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
|
||||
// affects are used to update placement rule cache
|
||||
diff.TableID = job.TableID
|
||||
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
|
||||
if len(args.OldPhysicalTblIDs) > 0 {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.OldPhysicalTblIDs, args.OldPhysicalTblIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSchemaDiffForRecoverTable set SchemaDiff for ActionRecoverTable.
|
||||
func SetSchemaDiffForRecoverTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
|
||||
// affects are used to update placement rule cache
|
||||
diff.TableID = job.TableID
|
||||
args := jobCtx.jobArgs.(*model.RecoverArgs)
|
||||
if len(args.AffectedPhysicalIDs) > 0 {
|
||||
diff.AffectedOpts = buildPlacementAffects(args.AffectedPhysicalIDs, args.AffectedPhysicalIDs)
|
||||
}
|
||||
}
|
||||
|
||||
@ -330,7 +337,7 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem
|
||||
case model.ActionCreateTables:
|
||||
err = SetSchemaDiffForCreateTables(diff, job)
|
||||
case model.ActionTruncateTable:
|
||||
err = SetSchemaDiffForTruncateTable(diff, job)
|
||||
err = SetSchemaDiffForTruncateTable(diff, job, jobCtx)
|
||||
case model.ActionCreateView:
|
||||
err = SetSchemaDiffForCreateView(diff, job)
|
||||
case model.ActionRenameTable:
|
||||
@ -340,9 +347,13 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem
|
||||
case model.ActionExchangeTablePartition:
|
||||
err = SetSchemaDiffForExchangeTablePartition(diff, job, multiInfos...)
|
||||
case model.ActionTruncateTablePartition:
|
||||
SetSchemaDiffForTruncateTablePartition(diff, job)
|
||||
case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable:
|
||||
SetSchemaDiffForDropTable(diff, job)
|
||||
SetSchemaDiffForTruncateTablePartition(diff, job, jobCtx)
|
||||
case model.ActionDropTablePartition:
|
||||
SetSchemaDiffForDropTablePartition(diff, job, jobCtx)
|
||||
case model.ActionRecoverTable:
|
||||
SetSchemaDiffForRecoverTable(diff, job, jobCtx)
|
||||
case model.ActionDropTable:
|
||||
SetSchemaDiffForDropTable(diff, job, jobCtx)
|
||||
case model.ActionReorganizePartition:
|
||||
SetSchemaDiffForReorganizePartition(diff, job)
|
||||
case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
|
||||
|
||||
@ -58,6 +58,12 @@ func repairTableOrViewWithCheck(t *meta.Mutator, job *model.Job, schemaID int64,
|
||||
}
|
||||
|
||||
func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
||||
args, err := model.GetDropTableArgs(job)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
jobCtx.jobArgs = args
|
||||
tblInfo, err := checkTableExistAndCancelNonExistJob(jobCtx.metaMut, job, job.SchemaID)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -68,7 +74,7 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error)
|
||||
case model.StatePublic:
|
||||
// public -> write only
|
||||
if job.Type == model.ActionDropTable {
|
||||
err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job)
|
||||
err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job, args)
|
||||
if err != nil {
|
||||
return ver, err
|
||||
}
|
||||
@ -89,8 +95,8 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error)
|
||||
tblInfo.State = model.StateNone
|
||||
oldIDs := getPartitionIDs(tblInfo)
|
||||
ruleIDs := append(getPartitionRuleIDs(job.SchemaName, tblInfo), fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L))
|
||||
job.CtxVars = []any{oldIDs}
|
||||
|
||||
args.OldPartitionIDs = oldIDs
|
||||
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -143,6 +149,7 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
jobCtx.jobArgs = args
|
||||
recoverInfo := args.RecoverTableInfos()[0]
|
||||
|
||||
schemaID := recoverInfo.SchemaID
|
||||
@ -228,6 +235,15 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
|
||||
tableInfo := tblInfo.Clone()
|
||||
tableInfo.State = model.StatePublic
|
||||
tableInfo.UpdateTS = metaMut.StartTS
|
||||
|
||||
var tids []int64
|
||||
if recoverInfo.TableInfo.GetPartitionInfo() != nil {
|
||||
tids = getPartitionIDs(recoverInfo.TableInfo)
|
||||
tids = append(tids, recoverInfo.TableInfo.ID)
|
||||
} else {
|
||||
tids = []int64{recoverInfo.TableInfo.ID}
|
||||
}
|
||||
args.AffectedPhysicalIDs = tids
|
||||
ver, err = updateVersionAndTableInfo(jobCtx, job, tableInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -243,13 +259,6 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
|
||||
}
|
||||
|
||||
func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *model.RecoverTableInfo) (ver int64, err error) {
|
||||
var tids []int64
|
||||
if recoverInfo.TableInfo.GetPartitionInfo() != nil {
|
||||
tids = getPartitionIDs(recoverInfo.TableInfo)
|
||||
tids = append(tids, recoverInfo.TableInfo.ID)
|
||||
} else {
|
||||
tids = []int64{recoverInfo.TableInfo.ID}
|
||||
}
|
||||
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(recoverInfo.TableInfo, recoverInfo.OldSchemaName, recoverInfo.OldTableName)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
@ -288,8 +297,6 @@ func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *mode
|
||||
return ver, errors.Wrapf(err, "failed to update the label rule to PD")
|
||||
}
|
||||
|
||||
// TODO(joechenrh): tid is used in SerSchemaDiffForDropTable, remove this after refactor done.
|
||||
job.CtxVars = []any{tids}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
@ -433,6 +440,7 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
jobCtx.jobArgs = args
|
||||
metaMut := jobCtx.metaMut
|
||||
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, schemaID)
|
||||
if err != nil {
|
||||
@ -498,12 +506,8 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
|
||||
newIDs = append(newIDs, newID)
|
||||
}
|
||||
}
|
||||
if job.Version == model.JobVersion1 {
|
||||
job.CtxVars = []any{oldIDs, newIDs}
|
||||
} else {
|
||||
args.OldPartIDsWithPolicy = oldIDs
|
||||
args.NewPartIDsWithPolicy = newIDs
|
||||
}
|
||||
args.OldPartIDsWithPolicy = oldIDs
|
||||
args.NewPartIDsWithPolicy = newIDs
|
||||
}
|
||||
|
||||
tableRuleID, partRuleIDs, _, oldRules, err := getOldLabelRules(tblInfo, job.SchemaName, tblInfo.Name.L)
|
||||
|
||||
@ -283,7 +283,8 @@ func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) {
|
||||
// when dropping multiple objects, each object will have a separate job
|
||||
type DropTableArgs struct {
|
||||
// below fields are only for drop table.
|
||||
// when dropping multiple tables, the Identifiers is the same.
|
||||
// when dropping multiple tables, the Identifiers is the same, but each drop-table
|
||||
// runs in a separate job.
|
||||
Identifiers []ast.Ident `json:"identifiers,omitempty"`
|
||||
FKCheck bool `json:"fk_check,omitempty"`
|
||||
|
||||
@ -306,8 +307,10 @@ func (a *DropTableArgs) getFinishedArgsV1(*Job) []any {
|
||||
}
|
||||
|
||||
func (a *DropTableArgs) decodeV1(job *Job) error {
|
||||
intest.Assert(job.Type == ActionDropTable, "only drop table job can call GetDropTableArgs")
|
||||
return job.DecodeArgs(&a.Identifiers, &a.FKCheck)
|
||||
if job.Type == ActionDropTable {
|
||||
return job.DecodeArgs(&a.Identifiers, &a.FKCheck)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDropTableArgs gets the drop-table args.
|
||||
@ -343,8 +346,9 @@ type TruncateTableArgs struct {
|
||||
OldPartitionIDs []int64 `json:"old_partition_ids,omitempty"`
|
||||
|
||||
// context vars
|
||||
NewPartIDsWithPolicy []int64 `json:"-"`
|
||||
OldPartIDsWithPolicy []int64 `json:"-"`
|
||||
NewPartIDsWithPolicy []int64 `json:"-"`
|
||||
OldPartIDsWithPolicy []int64 `json:"-"`
|
||||
ShouldUpdateAffectedPartitions bool `json:"-"`
|
||||
}
|
||||
|
||||
func (a *TruncateTableArgs) getArgsV1(job *Job) []any {
|
||||
@ -1103,6 +1107,9 @@ func GetAlterTableAttributesArgs(job *Job) (*AlterTableAttributesArgs, error) {
|
||||
type RecoverArgs struct {
|
||||
RecoverInfo *RecoverSchemaInfo `json:"recover_info,omitempty"`
|
||||
CheckFlag int64 `json:"check_flag,omitempty"`
|
||||
|
||||
// used during runtime
|
||||
AffectedPhysicalIDs []int64 `json:"-"`
|
||||
}
|
||||
|
||||
func (a *RecoverArgs) getArgsV1(job *Job) []any {
|
||||
|
||||
Reference in New Issue
Block a user