ddl: fix a bug when drop a column with single index (#32211)

close pingcap/tidb#32079
This commit is contained in:
wjHuang
2022-02-16 13:37:39 +08:00
committed by GitHub
parent a4b5190e88
commit a97fe372ae
2 changed files with 77 additions and 48 deletions

View File

@ -450,24 +450,6 @@ func onDropColumns(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateWriteOnly:
// write only -> delete only
setColumnsState(colInfos, model.StateDeleteOnly)
setIndicesState(idxInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
setIndicesState(idxInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
if len(idxInfos) > 0 {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
@ -477,8 +459,23 @@ func onDropColumns(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
tblInfo.Indices = newIndices
}
indexIDs := indexInfosToIDList(idxInfos)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-delCount]
setColumnsState(colInfos, model.StateNone)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
@ -491,7 +488,7 @@ func onDropColumns(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, indexIDs, getPartitionIDs(tblInfo))
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = errInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
@ -508,7 +505,9 @@ func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.
var colNames []model.CIStr
var ifExists []bool
err = job.DecodeArgs(&colNames, &ifExists)
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colNames, &ifExists, &indexIds)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
@ -540,6 +539,9 @@ func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.
indexInfos = append(indexInfos, idxInfos...)
}
job.Args = []interface{}{newColNames, newIfExists}
if len(indexIds) > 0 {
job.Args = append(job.Args, indexIds)
}
return tblInfo, colInfos, len(colInfos), indexInfos, nil
}
@ -589,24 +591,6 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateWriteOnly:
// write only -> delete only
colInfo.State = model.StateDeleteOnly
setIndicesState(idxInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
setIndicesState(idxInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
if len(idxInfos) > 0 {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
@ -616,8 +600,23 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
tblInfo.Indices = newIndices
}
indexIDs := indexInfosToIDList(idxInfos)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
@ -631,7 +630,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
} else {
// We should set related index IDs for job
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, indexIDs, getPartitionIDs(tblInfo))
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = errInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
@ -647,7 +646,9 @@ func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Col
}
var colName model.CIStr
err = job.DecodeArgs(&colName)
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colName, &indexIds)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)

View File

@ -732,16 +732,19 @@ func (s *testStateChangeSuite) TestDeleteOnly(c *C) {
s.runTestInSchemaState(c, model.StateDeleteOnly, true, dropColumnSQL, sqls, query)
}
// TestDeleteOnlyForDropColumnWithIndexes test for delete data when a middle-state column with indexes in it.
func (s *testStateChangeSuite) TestDeleteOnlyForDropColumnWithIndexes(c *C) {
// TestSchemaChangeForDropColumnWithIndexes test for modify data when a middle-state column with indexes in it.
func (s *testStateChangeSuite) TestSchemaChangeForDropColumnWithIndexes(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db_state")
sqls := make([]sqlWithErr, 2)
sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{"delete from t1", nil}
sqls[1] = sqlWithErr{"delete from t1 where b=1", errors.Errorf("[planner:1054]Unknown column 'b' in 'where clause'")}
sqls[2] = sqlWithErr{"insert into t1(a) values(1);", nil}
sqls[3] = sqlWithErr{"update t1 set a = 2 where a=1;", nil}
sqls[4] = sqlWithErr{"delete from t1", nil}
prepare := func() {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int key, b int, c int, index idx(b));")
tk.MustExec("create table t1(a bigint unsigned not null primary key, b int, c int, index idx(b));")
tk.MustExec("insert into t1 values(1,1,1);")
}
prepare()
@ -754,6 +757,31 @@ func (s *testStateChangeSuite) TestDeleteOnlyForDropColumnWithIndexes(c *C) {
s.runTestInSchemaState(c, model.StateDeleteReorganization, true, dropColumnSQL, sqls, query)
}
// TestSchemaChangeForDropColumnWithIndexes test for modify data when some middle-state columns with indexes in it.
func (s *testStateChangeSuite) TestSchemaChangeForDropColumnsWithIndexes(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db_state")
sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{"delete from t1", nil}
sqls[1] = sqlWithErr{"delete from t1 where b=1", errors.Errorf("[planner:1054]Unknown column 'b' in 'where clause'")}
sqls[2] = sqlWithErr{"insert into t1(a) values(1);", nil}
sqls[3] = sqlWithErr{"update t1 set a = 2 where a=1;", nil}
sqls[4] = sqlWithErr{"delete from t1", nil}
prepare := func() {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a bigint unsigned not null primary key, b int, c int, d int, index idx(b), index idx2(d));")
tk.MustExec("insert into t1 values(1,1,1,1);")
}
prepare()
dropColumnSQL := "alter table t1 drop column b, drop column d"
query := &expectQuery{sql: "select * from t1;", rows: []string{}}
s.runTestInSchemaState(c, model.StateWriteOnly, true, dropColumnSQL, sqls, query)
prepare()
s.runTestInSchemaState(c, model.StateDeleteOnly, true, dropColumnSQL, sqls, query)
prepare()
s.runTestInSchemaState(c, model.StateDeleteReorganization, true, dropColumnSQL, sqls, query)
}
// TestDeleteOnlyForDropExpressionIndex tests for deleting data when the hidden column is delete-only state.
func (s *serialTestStateChangeSuite) TestDeleteOnlyForDropExpressionIndex(c *C) {
_, err := s.se.Execute(context.Background(), "use test_db_state")