diff --git a/pkg/ddl/db_integration_test.go b/pkg/ddl/db_integration_test.go index 2ec36c1230..17e5e730ec 100644 --- a/pkg/ddl/db_integration_test.go +++ b/pkg/ddl/db_integration_test.go @@ -140,6 +140,29 @@ func TestModifyColumnAfterAddIndex(t *testing.T) { tk.MustExec(`insert into city values ("abc"), ("abd");`) } +func TestModifyColumnOldColumnIDNotFound(t *testing.T) { + store := testkit.CreateMockStore(t, mockstore.WithDDLChecker()) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + model.SetJobVerInUse(model.JobVersion1) + mockOwnerChange := false + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterRunOneJobStep", func(job *model.Job) { + if job.Type == model.ActionModifyColumn && + job.SchemaState == model.StateWriteReorganization && + !mockOwnerChange { + // Mock the first few phases of this DDL job is executed in old version TiDB. + model.UpdateJobArgsForTest(job, func(args []any) []any { + return args[:len(args)-1] + }) + mockOwnerChange = true + } + }) + tk.MustExec("alter table t modify column a tinyint;") + require.True(t, mockOwnerChange) +} + func TestIssue2293(t *testing.T) { store := testkit.CreateMockStore(t, mockstore.WithDDLChecker()) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index c1528c4c74..c3f44a836b 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -6962,20 +6962,18 @@ func (e *executor) RefreshMeta(sctx sessionctx.Context, args *model.RefreshMetaA } func getScatterScopeFromSessionctx(sctx sessionctx.Context) string { - var scatterScope string - val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion) - if !ok { - logutil.DDLLogger().Info("won't scatter region since system variable didn't set") - } else { - scatterScope = val + if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion); ok { + return val } - return scatterScope + logutil.DDLLogger().Info("system variable tidb_scatter_region not found, use default value") + return vardef.DefTiDBScatterRegion } func getEnableDDLAnalyze(sctx sessionctx.Context) string { if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBEnableDDLAnalyze); ok { return val } + logutil.DDLLogger().Info("system variable tidb_enable_ddl_analyze not found, use default value") return variable.BoolToOnOff(vardef.DefTiDBEnableDDLAnalyze) } @@ -6983,5 +6981,6 @@ func getAnalyzeVersion(sctx sessionctx.Context) string { if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBAnalyzeVersion); ok { return val } + logutil.DDLLogger().Info("system variable tidb_analyze_version not found, use default value") return strconv.Itoa(vardef.DefTiDBAnalyzeVersion) } diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index cff14b2d18..bcf7e4af0f 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -139,6 +139,12 @@ func (c *jobContext) cleanStepCtx() { c.stepCtx = nil // unset stepCtx for the next step initialization } +// genReorgTimeoutErr generates a reorganization timeout error. +func (c *jobContext) genReorgTimeoutErr() error { + c.reorgTimeoutOccurred = true + return dbterror.ErrWaitReorgTimeout +} + func (c *jobContext) getAutoIDRequirement() autoid.Requirement { return &asAutoIDRequirement{ store: c.store, diff --git a/pkg/ddl/modify_column.go b/pkg/ddl/modify_column.go index 9c22ce2531..7a43d202a6 100644 --- a/pkg/ddl/modify_column.go +++ b/pkg/ddl/modify_column.go @@ -234,12 +234,20 @@ func getModifyColumnInfo( if args.OldColumnID > 0 { oldCol = model.FindColumnInfoByID(tblInfo.Columns, args.OldColumnID) } else { - oldCol = model.FindColumnInfo(tblInfo.Columns, args.OldColumnName.L) + // Lower version TiDB doesn't persist the old column ID to job arguments. + // We have to use the old column name to locate the old column. + oldCol = model.FindColumnInfo(tblInfo.Columns, getRemovingObjName(args.OldColumnName.L)) + if oldCol == nil { + // The old column maybe not in removing state. + oldCol = model.FindColumnInfo(tblInfo.Columns, args.OldColumnName.L) + } if oldCol != nil { args.OldColumnID = oldCol.ID - logutil.DDLLogger().Info("run modify column job, init old column id", + logutil.DDLLogger().Info("run modify column job, find old column by name", + zap.Int64("jobID", job.ID), zap.String("oldColumnName", args.OldColumnName.L), - zap.Int64("oldColumnID", oldCol.ID)) + zap.Int64("oldColumnID", oldCol.ID), + ) } } if oldCol == nil { diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index c1c581a776..51cec513aa 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -400,8 +400,7 @@ func (w *worker) runReorgJob( logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry", zap.Int64("prevTS", res.ownerTS), zap.Int64("curTS", curTS)) - jobCtx.reorgTimeoutOccurred = true - return dbterror.ErrWaitReorgTimeout + return jobCtx.genReorgTimeoutErr() } // Since job is cancelled,we don't care about its partial counts. // TODO(lance6716): should we also do for paused job? @@ -440,8 +439,7 @@ func (w *worker) runReorgJob( w.mergeWarningsIntoJob(job) rc.resetWarnings() - jobCtx.reorgTimeoutOccurred = true - return dbterror.ErrWaitReorgTimeout + return jobCtx.genReorgTimeoutErr() } } } diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index d27c0ccc6e..c462f318b6 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -528,6 +528,13 @@ func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) { return rawArgs, errors.Trace(err) } +// UpdateJobArgsForTest updates job.args with the given update function. +func UpdateJobArgsForTest(job *Job, update func(args []any) []any) { + if intest.InTest { + job.args = update(job.args) + } +} + // Encode encodes job with json format. // updateRawArgs is used to determine whether to update the raw args. func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {