*: drop database should check foreign key constraint (#38006)

close pingcap/tidb#38005
This commit is contained in:
crazycs
2022-09-21 15:29:02 +08:00
committed by GitHub
parent f41d8d3d98
commit ab56dbef83
4 changed files with 122 additions and 4 deletions

View File

@ -593,12 +593,18 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (er
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
fkCheck := ctx.GetSessionVars().ForeignKeyChecks
err = checkDatabaseHasForeignKeyReferred(is, old.Name, fkCheck)
if err != nil {
return err
}
job := &model.Job{
SchemaID: old.ID,
SchemaName: old.Name.L,
SchemaState: old.State,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkCheck},
}
err = d.DoDDLJob(ctx, job)

View File

@ -549,3 +549,44 @@ func (h *foreignKeyHelper) getTableFromStorage(is infoschema.InfoSchema, t *meta
h.loaded[k] = result
return result, nil
}
func checkDatabaseHasForeignKeyReferred(is infoschema.InfoSchema, schema model.CIStr, fkCheck bool) error {
if !fkCheck {
return nil
}
tables := is.SchemaTables(schema)
tableNames := make([]ast.Ident, len(tables))
for i := range tables {
tableNames[i] = ast.Ident{Schema: schema, Name: tables[i].Meta().Name}
}
for _, tbl := range tables {
if referredFK := checkTableHasForeignKeyReferred(is, schema.L, tbl.Meta().Name.L, tableNames, fkCheck); referredFK != nil {
return errors.Trace(dbterror.ErrForeignKeyCannotDropParent.GenWithStackByArgs(tbl.Meta().Name, referredFK.ChildFKName, referredFK.ChildTable))
}
}
return nil
}
func checkDatabaseHasForeignKeyReferredInOwner(d *ddlCtx, t *meta.Meta, job *model.Job) error {
if !variable.EnableForeignKey.Load() {
return nil
}
var fkCheck bool
err := job.DecodeArgs(&fkCheck)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
if !fkCheck {
return nil
}
is, err := getAndCheckLatestInfoSchema(d, t)
if err != nil {
return errors.Trace(err)
}
err = checkDatabaseHasForeignKeyReferred(is, model.NewCIStr(job.SchemaName), fkCheck)
if err != nil {
job.State = model.JobStateCancelled
}
return errors.Trace(err)
}

View File

@ -946,10 +946,6 @@ func TestTruncateOrDropTableWithForeignKeyReferred2(t *testing.T) {
tk2.MustExec("set @@global.tidb_enable_foreign_key=1")
tk2.MustExec("set @@foreign_key_checks=1;")
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("set @@global.tidb_enable_foreign_key=1")
tk3.MustExec("set @@foreign_key_checks=1;")
tk3.MustExec("use test")
tk.MustExec("create table t1 (id int key, a int);")
@ -1256,3 +1252,74 @@ func TestRenameColumnWithForeignKeyMetaInfo(t *testing.T) {
" CONSTRAINT `fk_2` FOREIGN KEY (`bb`) REFERENCES `t1` (`bb`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}
func TestDropDatabaseWithForeignKeyReferred(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int, index(b));")
tk.MustExec("create table t2 (id int key, b int, foreign key fk_b(b) references t1(id));")
tk.MustExec("create database test2")
tk.MustExec("create table test2.t3 (id int key, b int, foreign key fk_b(b) references test.t2(id));")
err := tk.ExecToErr("drop database test;")
require.Error(t, err)
require.Equal(t, "[ddl:3730]Cannot drop table 't2' referenced by a foreign key constraint 'fk_b' on table 't3'.", err.Error())
tk.MustExec("set @@foreign_key_checks=0;")
tk.MustExec("drop database test")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("create database test")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int, index(b));")
tk.MustExec("create table t2 (id int key, b int, foreign key fk_b(b) references t1(id));")
err = tk.ExecToErr("drop database test;")
require.Error(t, err)
require.Equal(t, "[ddl:3730]Cannot drop table 't2' referenced by a foreign key constraint 'fk_b' on table 't3'.", err.Error())
tk.MustExec("drop table test2.t3")
tk.MustExec("drop database test")
}
func TestDropDatabaseWithForeignKeyReferred2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := dom.DDL()
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("set @@global.tidb_enable_foreign_key=1")
tk2.MustExec("set @@foreign_key_checks=1;")
tk2.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int, index(b));")
tk.MustExec("create table t2 (id int key, b int, foreign key fk_b(b) references t1(id));")
tk.MustExec("create database test2")
var wg sync.WaitGroup
var dropErr error
tc := &ddl.TestDDLCallback{}
tc.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateNone {
return
}
if job.Type != model.ActionCreateTable {
return
}
wg.Add(1)
go func() {
defer wg.Done()
dropErr = tk2.ExecToErr("drop database test")
}()
// make sure tk2's ddl job already put into ddl job queue.
time.Sleep(time.Millisecond * 100)
}
originalHook := d.GetHook()
defer d.SetHook(originalHook)
d.SetHook(tc)
tk.MustExec("create table test2.t3 (id int key, b int, foreign key fk_b(b) references test.t2(id));")
wg.Wait()
require.Error(t, dropErr)
require.Equal(t, "[ddl:3730]Cannot drop table 't2' referenced by a foreign key constraint 'fk_b' on table 't3'.", dropErr.Error())
}

View File

@ -190,6 +190,10 @@ func onDropSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
switch dbInfo.State {
case model.StatePublic:
// public -> write only
err = checkDatabaseHasForeignKeyReferredInOwner(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
dbInfo.State = model.StateWriteOnly
err = t.UpdateDatabase(dbInfo)
if err != nil {