ddl: extract data reorg code to separate functions (#33679)

ref pingcap/tidb#14766
This commit is contained in:
tangenta
2022-04-02 17:54:29 +08:00
committed by GitHub
parent 3388915388
commit 11ca0d4af2
2 changed files with 95 additions and 75 deletions

View File

@ -1009,51 +1009,12 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return ver, errors.Trace(err)
var done bool
done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
if !done {
return ver, err
}
// Inject a failpoint so that we can pause here and do verification on other components.
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name)
}, false)
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
if kv.IsTxnRetryableError(err) {
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
oldIdxIDs := getOldIndexIDs(tblInfo, oldCol) // used by GC delete range.
err = adjustTableInfoAfterModifyColumnWithData(tblInfo, pos, oldCol, changingCol, colName, changingIdxs)
@ -1080,6 +1041,55 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
}
// Inject a failpoint so that we can pause here and do verification on other components.
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
}, false)
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.IsTxnRetryableError(err) {
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, nil
}
func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast.ColumnPosition,
oldCol, changingCol *model.ColumnInfo, newName model.CIStr, changingIdxs []*model.IndexInfo) (err error) {
if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L {

View File

@ -557,40 +557,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return ver, errors.Trace(err)
var done bool
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if !done {
return ver, err
}
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name)
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// if timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err)
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1))
}
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
@ -612,6 +584,44 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
}
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name)
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// if timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(t, job, tbl.Meta(), indexInfo, err)
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1))
}
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, errors.Trace(err)
}
func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
if err != nil {