ddl: find old column by name for old version modify column job (#63901)
close pingcap/tidb#63895
This commit is contained in:
@ -140,6 +140,29 @@ func TestModifyColumnAfterAddIndex(t *testing.T) {
|
||||
tk.MustExec(`insert into city values ("abc"), ("abd");`)
|
||||
}
|
||||
|
||||
func TestModifyColumnOldColumnIDNotFound(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t, mockstore.WithDDLChecker())
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("create table t (a int, b int);")
|
||||
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")
|
||||
model.SetJobVerInUse(model.JobVersion1)
|
||||
mockOwnerChange := false
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterRunOneJobStep", func(job *model.Job) {
|
||||
if job.Type == model.ActionModifyColumn &&
|
||||
job.SchemaState == model.StateWriteReorganization &&
|
||||
!mockOwnerChange {
|
||||
// Mock the first few phases of this DDL job is executed in old version TiDB.
|
||||
model.UpdateJobArgsForTest(job, func(args []any) []any {
|
||||
return args[:len(args)-1]
|
||||
})
|
||||
mockOwnerChange = true
|
||||
}
|
||||
})
|
||||
tk.MustExec("alter table t modify column a tinyint;")
|
||||
require.True(t, mockOwnerChange)
|
||||
}
|
||||
|
||||
func TestIssue2293(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t, mockstore.WithDDLChecker())
|
||||
|
||||
|
||||
@ -6962,20 +6962,18 @@ func (e *executor) RefreshMeta(sctx sessionctx.Context, args *model.RefreshMetaA
|
||||
}
|
||||
|
||||
func getScatterScopeFromSessionctx(sctx sessionctx.Context) string {
|
||||
var scatterScope string
|
||||
val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion)
|
||||
if !ok {
|
||||
logutil.DDLLogger().Info("won't scatter region since system variable didn't set")
|
||||
} else {
|
||||
scatterScope = val
|
||||
if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion); ok {
|
||||
return val
|
||||
}
|
||||
return scatterScope
|
||||
logutil.DDLLogger().Info("system variable tidb_scatter_region not found, use default value")
|
||||
return vardef.DefTiDBScatterRegion
|
||||
}
|
||||
|
||||
func getEnableDDLAnalyze(sctx sessionctx.Context) string {
|
||||
if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBEnableDDLAnalyze); ok {
|
||||
return val
|
||||
}
|
||||
logutil.DDLLogger().Info("system variable tidb_enable_ddl_analyze not found, use default value")
|
||||
return variable.BoolToOnOff(vardef.DefTiDBEnableDDLAnalyze)
|
||||
}
|
||||
|
||||
@ -6983,5 +6981,6 @@ func getAnalyzeVersion(sctx sessionctx.Context) string {
|
||||
if val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBAnalyzeVersion); ok {
|
||||
return val
|
||||
}
|
||||
logutil.DDLLogger().Info("system variable tidb_analyze_version not found, use default value")
|
||||
return strconv.Itoa(vardef.DefTiDBAnalyzeVersion)
|
||||
}
|
||||
|
||||
@ -139,6 +139,12 @@ func (c *jobContext) cleanStepCtx() {
|
||||
c.stepCtx = nil // unset stepCtx for the next step initialization
|
||||
}
|
||||
|
||||
// genReorgTimeoutErr generates a reorganization timeout error.
|
||||
func (c *jobContext) genReorgTimeoutErr() error {
|
||||
c.reorgTimeoutOccurred = true
|
||||
return dbterror.ErrWaitReorgTimeout
|
||||
}
|
||||
|
||||
func (c *jobContext) getAutoIDRequirement() autoid.Requirement {
|
||||
return &asAutoIDRequirement{
|
||||
store: c.store,
|
||||
|
||||
@ -234,12 +234,20 @@ func getModifyColumnInfo(
|
||||
if args.OldColumnID > 0 {
|
||||
oldCol = model.FindColumnInfoByID(tblInfo.Columns, args.OldColumnID)
|
||||
} else {
|
||||
oldCol = model.FindColumnInfo(tblInfo.Columns, args.OldColumnName.L)
|
||||
// Lower version TiDB doesn't persist the old column ID to job arguments.
|
||||
// We have to use the old column name to locate the old column.
|
||||
oldCol = model.FindColumnInfo(tblInfo.Columns, getRemovingObjName(args.OldColumnName.L))
|
||||
if oldCol == nil {
|
||||
// The old column maybe not in removing state.
|
||||
oldCol = model.FindColumnInfo(tblInfo.Columns, args.OldColumnName.L)
|
||||
}
|
||||
if oldCol != nil {
|
||||
args.OldColumnID = oldCol.ID
|
||||
logutil.DDLLogger().Info("run modify column job, init old column id",
|
||||
logutil.DDLLogger().Info("run modify column job, find old column by name",
|
||||
zap.Int64("jobID", job.ID),
|
||||
zap.String("oldColumnName", args.OldColumnName.L),
|
||||
zap.Int64("oldColumnID", oldCol.ID))
|
||||
zap.Int64("oldColumnID", oldCol.ID),
|
||||
)
|
||||
}
|
||||
}
|
||||
if oldCol == nil {
|
||||
|
||||
@ -400,8 +400,7 @@ func (w *worker) runReorgJob(
|
||||
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
|
||||
zap.Int64("prevTS", res.ownerTS),
|
||||
zap.Int64("curTS", curTS))
|
||||
jobCtx.reorgTimeoutOccurred = true
|
||||
return dbterror.ErrWaitReorgTimeout
|
||||
return jobCtx.genReorgTimeoutErr()
|
||||
}
|
||||
// Since job is cancelled,we don't care about its partial counts.
|
||||
// TODO(lance6716): should we also do for paused job?
|
||||
@ -440,8 +439,7 @@ func (w *worker) runReorgJob(
|
||||
w.mergeWarningsIntoJob(job)
|
||||
|
||||
rc.resetWarnings()
|
||||
jobCtx.reorgTimeoutOccurred = true
|
||||
return dbterror.ErrWaitReorgTimeout
|
||||
return jobCtx.genReorgTimeoutErr()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -528,6 +528,13 @@ func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) {
|
||||
return rawArgs, errors.Trace(err)
|
||||
}
|
||||
|
||||
// UpdateJobArgsForTest updates job.args with the given update function.
|
||||
func UpdateJobArgsForTest(job *Job, update func(args []any) []any) {
|
||||
if intest.InTest {
|
||||
job.args = update(job.args)
|
||||
}
|
||||
}
|
||||
|
||||
// Encode encodes job with json format.
|
||||
// updateRawArgs is used to determine whether to update the raw args.
|
||||
func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
|
||||
|
||||
Reference in New Issue
Block a user