From 858b8a38ea35e9eaeff29ab03e7d5e32fdfb5412 Mon Sep 17 00:00:00 2001 From: crazycs Date: Sat, 24 Sep 2022 00:41:43 +0800 Subject: [PATCH] *: add foreign key constraint check when execute insert statement (#37466) close pingcap/tidb#37465 --- errors.toml | 10 + executor/adapter.go | 19 ++ executor/builder.go | 5 + executor/foreign_key.go | 489 +++++++++++++++++++++++++++++++++++ executor/foreign_key_test.go | 474 +++++++++++++++++++++++++++++++++ executor/insert.go | 7 +- executor/insert_common.go | 17 ++ executor/update.go | 2 +- executor/write.go | 8 +- kv/option.go | 2 + parser/model/model.go | 4 +- planner/core/common_plans.go | 2 + planner/core/errors.go | 10 +- planner/core/foreign_key.go | 176 +++++++++++++ planner/core/planbuilder.go | 4 + store/driver/txn/snapshot.go | 5 + 16 files changed, 1225 insertions(+), 9 deletions(-) create mode 100644 executor/foreign_key.go create mode 100644 executor/foreign_key_test.go create mode 100644 planner/core/foreign_key.go diff --git a/errors.toml b/errors.toml index 221ba2fe09..d2df27fc86 100644 --- a/errors.toml +++ b/errors.toml @@ -1936,6 +1936,16 @@ error = ''' Key part '%-.192s' length cannot be 0 ''' +["planner:1451"] +error = ''' +Cannot delete or update a parent row: a foreign key constraint fails (%.192s) +''' + +["planner:1452"] +error = ''' +Cannot add or update a child row: a foreign key constraint fails (%.192s) +''' + ["planner:1462"] error = ''' `%-.192s`.`%-.192s` contains view recursion diff --git a/executor/adapter.go b/executor/adapter.go index a2301b74b6..857e874fb4 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -536,6 +536,10 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { } if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled || err != nil { + if err != nil { + return result, err + } + err = a.handleForeignKeyTrigger(ctx, e) return result, err } @@ -555,6 +559,21 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { }, nil } +func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor) error { + exec, ok := e.(WithForeignKeyTrigger) + if !ok { + return nil + } + fkChecks := exec.GetFKChecks() + for _, fkCheck := range fkChecks { + err := fkCheck.doCheck(ctx) + if err != nil { + return err + } + } + return nil +} + func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic bool) (handled bool, rs sqlexec.RecordSet, err error) { sc := a.Ctx.GetSessionVars().StmtCtx defer func() { diff --git a/executor/builder.go b/executor/builder.go index 08b4b6eaff..efbaf0fab9 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -888,6 +888,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { b.err = err return nil } + ivs.fkChecks, err = buildFKCheckExecs(b.ctx, ivs.Table, v.FKChecks) + if err != nil { + b.err = err + return nil + } if v.IsReplace { return b.buildReplace(ivs) diff --git a/executor/foreign_key.go b/executor/foreign_key.go new file mode 100644 index 0000000000..ee49dd06f7 --- /dev/null +++ b/executor/foreign_key.go @@ -0,0 +1,489 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "sync/atomic" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/set" + "github.com/tikv/client-go/v2/txnkv/txnsnapshot" +) + +// WithForeignKeyTrigger indicates the executor has foreign key check or cascade. +type WithForeignKeyTrigger interface { + GetFKChecks() []*FKCheckExec +} + +// FKCheckExec uses to check foreign key constraint. +// When insert/update child table, need to check the row has related row exists in refer table. +// When insert/update parent table, need to check the row doesn't have related row exists in refer table. +type FKCheckExec struct { + *plannercore.FKCheck + *fkValueHelper + ctx sessionctx.Context + + toBeCheckedKeys []kv.Key + toBeCheckedPrefixKeys []kv.Key + toBeLockedKeys []kv.Key + + checkRowsCache map[string]bool +} + +func buildFKCheckExecs(sctx sessionctx.Context, tbl table.Table, fkChecks []*plannercore.FKCheck) ([]*FKCheckExec, error) { + fkCheckExecs := make([]*FKCheckExec, 0, len(fkChecks)) + for _, fkCheck := range fkChecks { + fkCheckExec, err := buildFKCheckExec(sctx, tbl, fkCheck) + if err != nil { + return nil, err + } + if fkCheckExec != nil { + fkCheckExecs = append(fkCheckExecs, fkCheckExec) + } + } + return fkCheckExecs, nil +} + +func buildFKCheckExec(sctx sessionctx.Context, tbl table.Table, fkCheck *plannercore.FKCheck) (*FKCheckExec, error) { + var cols []model.CIStr + if fkCheck.FK != nil { + cols = fkCheck.FK.Cols + } else if fkCheck.ReferredFK != nil { + cols = fkCheck.ReferredFK.Cols + } + colsOffsets, err := getFKColumnsOffsets(tbl.Meta(), cols) + if err != nil { + return nil, err + } + helper := &fkValueHelper{ + colsOffsets: colsOffsets, + fkValuesSet: set.NewStringSet(), + } + return &FKCheckExec{ + ctx: sctx, + FKCheck: fkCheck, + fkValueHelper: helper, + }, nil +} + +func (fkc *FKCheckExec) insertRowNeedToCheck(sc *stmtctx.StatementContext, row []types.Datum) error { + return fkc.addRowNeedToCheck(sc, row) +} + +func (fkc *FKCheckExec) updateRowNeedToCheck(sc *stmtctx.StatementContext, oldRow, newRow []types.Datum) error { + if fkc.FK != nil { + return fkc.addRowNeedToCheck(sc, newRow) + } else if fkc.ReferredFK != nil { + return fkc.addRowNeedToCheck(sc, oldRow) + } + return nil +} + +func (fkc *FKCheckExec) addRowNeedToCheck(sc *stmtctx.StatementContext, row []types.Datum) error { + vals, err := fkc.fetchFKValuesWithCheck(sc, row) + if err != nil || len(vals) == 0 { + return err + } + key, isPrefix, err := fkc.buildCheckKeyFromFKValue(sc, vals) + if err != nil { + return err + } + if isPrefix { + fkc.toBeCheckedPrefixKeys = append(fkc.toBeCheckedPrefixKeys, key) + } else { + fkc.toBeCheckedKeys = append(fkc.toBeCheckedKeys, key) + } + return nil +} + +func (fkc *FKCheckExec) doCheck(ctx context.Context) error { + txn, err := fkc.ctx.Txn(false) + if err != nil { + return err + } + err = fkc.checkKeys(ctx, txn) + if err != nil { + return err + } + err = fkc.checkIndexKeys(ctx, txn) + if err != nil { + return err + } + if len(fkc.toBeLockedKeys) == 0 { + return nil + } + sessVars := fkc.ctx.GetSessionVars() + lockCtx, err := newLockCtx(fkc.ctx, sessVars.LockWaitTimeout, len(fkc.toBeLockedKeys)) + if err != nil { + return err + } + // WARN: Since tidb current doesn't support `LOCK IN SHARE MODE`, therefore, performance will be very poor in concurrency cases. + // TODO(crazycs520):After TiDB support `LOCK IN SHARE MODE`, use `LOCK IN SHARE MODE` here. + forUpdate := atomic.LoadUint32(&sessVars.TxnCtx.ForUpdate) + err = doLockKeys(ctx, fkc.ctx, lockCtx, fkc.toBeLockedKeys...) + // doLockKeys may set TxnCtx.ForUpdate to 1, then if the lock meet write conflict, TiDB can't retry for update. + // So reset TxnCtx.ForUpdate to 0 then can be retry if meet write conflict. + atomic.StoreUint32(&sessVars.TxnCtx.ForUpdate, forUpdate) + return err +} + +func (fkc *FKCheckExec) buildCheckKeyFromFKValue(sc *stmtctx.StatementContext, vals []types.Datum) (key kv.Key, isPrefix bool, err error) { + if fkc.IdxIsPrimaryKey { + handleKey, err := fkc.buildHandleFromFKValues(sc, vals) + if err != nil { + return nil, false, err + } + key := tablecodec.EncodeRecordKey(fkc.Tbl.RecordPrefix(), handleKey) + if fkc.IdxIsExclusive { + return key, false, nil + } + return key, true, nil + } + key, distinct, err := fkc.Idx.GenIndexKey(sc, vals, nil, nil) + if err != nil { + return nil, false, err + } + if distinct && fkc.IdxIsExclusive { + return key, false, nil + } + return key, true, nil +} + +func (fkc *FKCheckExec) buildHandleFromFKValues(sc *stmtctx.StatementContext, vals []types.Datum) (kv.Handle, error) { + if len(vals) == 1 && fkc.Idx == nil { + return kv.IntHandle(vals[0].GetInt64()), nil + } + handleBytes, err := codec.EncodeKey(sc, nil, vals...) + if err != nil { + return nil, err + } + return kv.NewCommonHandle(handleBytes) +} + +func (fkc *FKCheckExec) checkKeys(ctx context.Context, txn kv.Transaction) error { + if len(fkc.toBeCheckedKeys) == 0 { + return nil + } + err := fkc.prefetchKeys(ctx, txn, fkc.toBeCheckedKeys) + if err != nil { + return err + } + for _, k := range fkc.toBeCheckedKeys { + err = fkc.checkKey(ctx, txn, k) + if err != nil { + return err + } + } + return nil +} + +func (fkc *FKCheckExec) prefetchKeys(ctx context.Context, txn kv.Transaction, keys []kv.Key) error { + // Fill cache using BatchGet + _, err := txn.BatchGet(ctx, keys) + if err != nil { + return err + } + return nil +} + +func (fkc *FKCheckExec) checkKey(ctx context.Context, txn kv.Transaction, k kv.Key) error { + if fkc.CheckExist { + return fkc.checkKeyExist(ctx, txn, k) + } + return fkc.checkKeyNotExist(ctx, txn, k) +} + +func (fkc *FKCheckExec) checkKeyExist(ctx context.Context, txn kv.Transaction, k kv.Key) error { + _, err := txn.Get(ctx, k) + if err == nil { + fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, k) + return nil + } + if kv.IsErrNotFound(err) { + return fkc.FailedErr + } + return err +} + +func (fkc *FKCheckExec) checkKeyNotExist(ctx context.Context, txn kv.Transaction, k kv.Key) error { + _, err := txn.Get(ctx, k) + if err == nil { + return fkc.FailedErr + } + if kv.IsErrNotFound(err) { + return nil + } + return err +} + +func (fkc *FKCheckExec) checkIndexKeys(ctx context.Context, txn kv.Transaction) error { + if len(fkc.toBeCheckedPrefixKeys) == 0 { + return nil + } + memBuffer := txn.GetMemBuffer() + snap := txn.GetSnapshot() + snap.SetOption(kv.ScanBatchSize, 2) + defer func() { + snap.SetOption(kv.ScanBatchSize, txnsnapshot.DefaultScanBatchSize) + }() + for _, key := range fkc.toBeCheckedPrefixKeys { + err := fkc.checkPrefixKey(ctx, memBuffer, snap, key) + if err != nil { + return err + } + } + return nil +} + +func (fkc *FKCheckExec) checkPrefixKey(ctx context.Context, memBuffer kv.MemBuffer, snap kv.Snapshot, key kv.Key) error { + key, value, err := fkc.getIndexKeyValueInTable(ctx, memBuffer, snap, key) + if err != nil { + return err + } + if fkc.CheckExist { + return fkc.checkPrefixKeyExist(key, value) + } + if len(value) > 0 { + // If check not exist, but the key is exist, return failedErr. + return fkc.FailedErr + } + return nil +} + +func (fkc *FKCheckExec) checkPrefixKeyExist(key kv.Key, value []byte) error { + exist := len(value) > 0 + if !exist { + return fkc.FailedErr + } + if fkc.Idx != nil && fkc.Idx.Meta().Primary && fkc.Tbl.Meta().IsCommonHandle { + fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, key) + } else { + handle, err := tablecodec.DecodeIndexHandle(key, value, len(fkc.Idx.Meta().Columns)) + if err != nil { + return err + } + handleKey := tablecodec.EncodeRecordKey(fkc.Tbl.RecordPrefix(), handle) + fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, handleKey) + } + return nil +} + +func (fkc *FKCheckExec) getIndexKeyValueInTable(ctx context.Context, memBuffer kv.MemBuffer, snap kv.Snapshot, key kv.Key) (k []byte, v []byte, _ error) { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + memIter, err := memBuffer.Iter(key, key.PrefixNext()) + if err != nil { + return nil, nil, err + } + deletedKeys := set.NewStringSet() + defer memIter.Close() + for ; memIter.Valid(); err = memIter.Next() { + if err != nil { + return nil, nil, err + } + k := memIter.Key() + if !k.HasPrefix(key) { + break + } + // check whether the key was been deleted. + if len(memIter.Value()) > 0 { + return k, memIter.Value(), nil + } + deletedKeys.Insert(string(k)) + } + + it, err := snap.Iter(key, key.PrefixNext()) + if err != nil { + return nil, nil, err + } + defer it.Close() + for ; it.Valid(); err = it.Next() { + if err != nil { + return nil, nil, err + } + k := it.Key() + if !k.HasPrefix(key) { + break + } + if !deletedKeys.Exist(string(k)) { + return k, it.Value(), nil + } + } + return nil, nil, nil +} + +type fkValueHelper struct { + colsOffsets []int + fkValuesSet set.StringSet +} + +func (h *fkValueHelper) fetchFKValuesWithCheck(sc *stmtctx.StatementContext, row []types.Datum) ([]types.Datum, error) { + vals, err := h.fetchFKValues(row) + if err != nil || h.hasNullValue(vals) { + return nil, err + } + keyBuf, err := codec.EncodeKey(sc, nil, vals...) + if err != nil { + return nil, err + } + key := string(keyBuf) + if h.fkValuesSet.Exist(key) { + return nil, nil + } + h.fkValuesSet.Insert(key) + return vals, nil +} + +func (h *fkValueHelper) fetchFKValues(row []types.Datum) ([]types.Datum, error) { + vals := make([]types.Datum, len(h.colsOffsets)) + for i, offset := range h.colsOffsets { + if offset >= len(row) { + return nil, table.ErrIndexOutBound.GenWithStackByArgs("", offset, row) + } + vals[i] = row[offset] + } + return vals, nil +} + +func (h *fkValueHelper) hasNullValue(vals []types.Datum) bool { + // If any foreign key column value is null, no need to check this row. + // test case: + // create table t1 (id int key,a int, b int, index(a, b)); + // create table t2 (id int key,a int, b int, foreign key fk(a, b) references t1(a, b) ON DELETE CASCADE); + // > insert into t2 values (2, null, 1); + // Query OK, 1 row affected + // > insert into t2 values (3, 1, null); + // Query OK, 1 row affected + // > insert into t2 values (4, null, null); + // Query OK, 1 row affected + // > select * from t2; + // +----+--------+--------+ + // | id | a | b | + // +----+--------+--------+ + // | 4 | | | + // | 2 | | 1 | + // | 3 | 1 | | + // +----+--------+--------+ + for _, val := range vals { + if val.IsNull() { + return true + } + } + return false +} + +func getFKColumnsOffsets(tbInfo *model.TableInfo, cols []model.CIStr) ([]int, error) { + colsOffsets := make([]int, len(cols)) + for i, col := range cols { + offset := -1 + for i := range tbInfo.Columns { + if tbInfo.Columns[i].Name.L == col.L { + offset = tbInfo.Columns[i].Offset + break + } + } + if offset < 0 { + return nil, table.ErrUnknownColumn.GenWithStackByArgs(col.L) + } + colsOffsets[i] = offset + } + return colsOffsets, nil +} + +type fkCheckKey struct { + k kv.Key + isPrefix bool +} + +func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error { + if len(rows) == 0 { + return nil + } + if fkc.checkRowsCache == nil { + fkc.checkRowsCache = map[string]bool{} + } + fkCheckKeys := make([]*fkCheckKey, len(rows)) + prefetchKeys := make([]kv.Key, 0, len(rows)) + for i, r := range rows { + if r.ignored { + continue + } + vals, err := fkc.fetchFKValues(r.row) + if err != nil { + return err + } + if fkc.hasNullValue(vals) { + continue + } + key, isPrefix, err := fkc.buildCheckKeyFromFKValue(sc, vals) + if err != nil { + return err + } + fkCheckKeys[i] = &fkCheckKey{key, isPrefix} + if !isPrefix { + prefetchKeys = append(prefetchKeys, key) + } + } + if len(prefetchKeys) > 0 { + err := fkc.prefetchKeys(ctx, txn, prefetchKeys) + if err != nil { + return err + } + } + memBuffer := txn.GetMemBuffer() + snap := txn.GetSnapshot() + snap.SetOption(kv.ScanBatchSize, 2) + defer func() { + snap.SetOption(kv.ScanBatchSize, 256) + }() + for i, fkCheckKey := range fkCheckKeys { + if fkCheckKey == nil { + continue + } + k := fkCheckKey.k + if ignore, ok := fkc.checkRowsCache[string(k)]; ok { + if ignore { + rows[i].ignored = true + sc.AppendWarning(fkc.FailedErr) + } + continue + } + var err error + if fkCheckKey.isPrefix { + err = fkc.checkPrefixKey(ctx, memBuffer, snap, k) + } else { + err = fkc.checkKey(ctx, txn, k) + } + if err != nil { + rows[i].ignored = true + sc.AppendWarning(fkc.FailedErr) + fkc.checkRowsCache[string(k)] = true + } + fkc.checkRowsCache[string(k)] = false + } + return nil +} diff --git a/executor/foreign_key_test.go b/executor/foreign_key_test.go new file mode 100644 index 0000000000..2e68042a5a --- /dev/null +++ b/executor/foreign_key_test.go @@ -0,0 +1,474 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + "sync" + "testing" + + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +var foreignKeyTestCase1 = []struct { + prepareSQLs []string + notNull bool +}{ + // Case-1: test unique index only contain foreign key columns. + { + prepareSQLs: []string{ + "create table t1 (id int, a int, b int, unique index(id), unique index(a, b));", + "create table t2 (b int, name varchar(10), a int, id int, unique index(id), unique index (a,b), foreign key fk(a, b) references t1(a, b));", + }, + }, + // Case-2: test unique index contain foreign key columns and other columns. + { + prepareSQLs: []string{ + "create table t1 (id int key, a int, b int, unique index(id), unique index(a, b, id));", + "create table t2 (b int, a int, id int key, name varchar(10), unique index (a,b, id), foreign key fk(a, b) references t1(a, b));", + }, + }, + // Case-3: test non-unique index only contain foreign key columns. + { + prepareSQLs: []string{ + "create table t1 (id int key,a int, b int, unique index(id), index(a, b));", + "create table t2 (b int, a int, name varchar(10), id int key, index (a, b), foreign key fk(a, b) references t1(a, b));", + }, + }, + // Case-4: test non-unique index contain foreign key columns and other columns. + { + prepareSQLs: []string{ + "create table t1 (id int key,a int, b int, unique index(id), index(a, b, id));", + "create table t2 (name varchar(10), b int, a int, id int key, index (a, b, id), foreign key fk(a, b) references t1(a, b));", + }, + }, + //Case-5: test primary key only contain foreign key columns, and disable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=0;", + "create table t1 (id int, a int, b int, unique index(id), primary key (a, b));", + "create table t2 (b int, name varchar(10), a int, id int, unique index(id), primary key (a, b), foreign key fk(a, b) references t1(a, b));", + }, + notNull: true, + }, + // Case-6: test primary key only contain foreign key columns, and enable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=1;", + "create table t1 (id int, a int, b int, unique index(id), primary key (a, b));", + "create table t2 (b int, a int, name varchar(10), id int, unique index(id), primary key (a, b), foreign key fk(a, b) references t1(a, b));", + }, + notNull: true, + }, + // Case-7: test primary key contain foreign key columns and other column, and disable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=0;", + "create table t1 (id int, a int, b int, unique index(id), primary key (a, b, id));", + "create table t2 (b int, a int, id int, name varchar(10), unique index(id), primary key (a, b, id), foreign key fk(a, b) references t1(a, b));", + }, + notNull: true, + }, + // Case-8: test primary key contain foreign key columns and other column, and enable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=1;", + "create table t1 (id int, a int, b int, unique index(id), primary key (a, b, id));", + "create table t2 (name varchar(10), b int, a int, id int, unique index(id), primary key (a, b, id), foreign key fk(a, b) references t1(a, b));", + }, + notNull: true, + }, +} + +func TestForeignKeyOnInsertChildTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + tk.MustExec("create table t_data (id int, a int, b int)") + tk.MustExec("insert into t_data (id, a, b) values (1, 1, 1), (2, 2, 2);") + for _, ca := range foreignKeyTestCase1 { + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + for _, sql := range ca.prepareSQLs { + tk.MustExec(sql) + } + tk.MustExec("insert into t1 (id, a, b) values (1, 1, 1);") + tk.MustExec("insert into t2 (id, a, b) values (1, 1, 1)") + if !ca.notNull { + tk.MustExec("insert into t2 (id, a, b) values (2, null, 1)") + tk.MustExec("insert into t2 (id, a, b) values (3, 1, null)") + tk.MustExec("insert into t2 (id, a, b) values (4, null, null)") + } + tk.MustGetDBError("insert into t2 (id, a, b) values (5, 1, 0);", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("insert into t2 (id, a, b) values (6, 0, 1);", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("insert into t2 (id, a, b) values (7, 2, 2);", plannercore.ErrNoReferencedRow2) + // Test insert from select. + tk.MustExec("delete from t2") + tk.MustExec("insert into t2 (id, a, b) select id, a, b from t_data where t_data.id=1") + tk.MustGetDBError("insert into t2 (id, a, b) select id, a, b from t_data where t_data.id=2", plannercore.ErrNoReferencedRow2) + + // Test in txn + tk.MustExec("delete from t2") + tk.MustExec("begin") + tk.MustExec("delete from t1 where a=1") + tk.MustGetDBError("insert into t2 (id, a, b) values (1, 1, 1)", plannercore.ErrNoReferencedRow2) + tk.MustExec("insert into t1 (id, a, b) values (2, 2, 2)") + tk.MustExec("insert into t2 (id, a, b) values (2, 2, 2)") + tk.MustExec("rollback") + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 1 1")) + tk.MustQuery("select id, a, b from t2 order by id").Check(testkit.Rows()) + } + + // Case-10: test primary key is handle and contain foreign key column, and foreign key column has default value. + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("set @@tidb_enable_clustered_index=0;") + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1 (id int,a int, primary key(id));") + tk.MustExec("create table t2 (id int key,a int not null default 0, index (a), foreign key fk(a) references t1(id));") + tk.MustExec("insert into t1 values (1, 1);") + tk.MustExec("insert into t2 values (1, 1);") + tk.MustGetDBError("insert into t2 (id) values (10);", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("insert into t2 values (3, 2);", plannercore.ErrNoReferencedRow2) + + // Case-11: test primary key is handle and contain foreign key column, and foreign key column doesn't have default value. + tk.MustExec("drop table if exists t2;") + tk.MustExec("create table t2 (id int key,a int, index (a), foreign key fk(a) references t1(id));") + tk.MustExec("insert into t2 values (1, 1);") + tk.MustExec("insert into t2 (id) values (10);") + tk.MustGetDBError("insert into t2 values (3, 2);", plannercore.ErrNoReferencedRow2) +} + +func TestForeignKeyOnInsertDuplicateUpdateChildTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + for _, ca := range foreignKeyTestCase1 { + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + for _, sql := range ca.prepareSQLs { + tk.MustExec(sql) + } + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a')") + + sqls := []string{ + "insert into t2 (id, a, b, name) values (1, 12, 22, 'b') on duplicate key update a = 100", + "insert into t2 (id, a, b, name) values (1, 13, 23, 'c') on duplicate key update a = a+10", + "insert into t2 (id, a, b, name) values (1, 14, 24, 'd') on duplicate key update a = a + 100", + "insert into t2 (id, a, b, name) values (1, 14, 24, 'd') on duplicate key update a = 12, b = 23", + } + for _, sqlStr := range sqls { + tk.MustGetDBError(sqlStr, plannercore.ErrNoReferencedRow2) + } + tk.MustExec("insert into t2 (id, a, b, name) values (1, 14, 26, 'b') on duplicate key update a = 12, b = 22, name = 'x'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 12 22 x")) + if !ca.notNull { + tk.MustExec("insert into t2 (id, a, b, name) values (1, 14, 26, 'b') on duplicate key update a = null, b = 22, name = 'y'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 22 y")) + tk.MustExec("insert into t2 (id, a, b, name) values (1, 15, 26, 'b') on duplicate key update b = null, name = 'z'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 z")) + } + tk.MustExec("insert into t2 (id, a, b, name) values (1, 15, 26, 'b') on duplicate key update a=13,b=23, name = 'c'") + tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("1 13 23 c")) + + // Test In txn. + tk.MustExec("delete from t2") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (2, 11, 21, 'a')") + tk.MustExec("begin") + tk.MustExec("insert into t2 (id, a, b, name) values (2, 14, 26, 'b') on duplicate key update a = 12, b = 22, name = 'x'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("2 12 22 x")) + tk.MustExec("rollback") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("2 11 21 a")) + + tk.MustExec("begin") + tk.MustExec("delete from t1 where id=3") + tk.MustGetDBError("insert into t2 (id, a, b, name) values (2, 13, 23, 'y') on duplicate key update a = 13, b = 23, name = 'y'", plannercore.ErrNoReferencedRow2) + tk.MustExec("insert into t2 (id, a, b, name) values (2, 14, 24, 'z') on duplicate key update a = 14, b = 24, name = 'z'") + tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)") + tk.MustExec("insert into t2 (id, a, b, name) values (2, 15, 25, 'o') on duplicate key update a = 15, b = 25, name = 'o'") + tk.MustExec("delete from t1 where id=1") + tk.MustGetDBError("insert into t2 (id, a, b, name) values (2, 11, 21, 'y') on duplicate key update a = 11, b = 21, name = 'p'", plannercore.ErrNoReferencedRow2) + tk.MustExec("commit") + tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("2 15 25 o")) + } + + // Case-9: test primary key is handle and contain foreign key column. + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("set @@tidb_enable_clustered_index=0;") + tk.MustExec("create table t1 (id int, a int, b int, primary key (id));") + tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));") + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')") + + tk.MustExec("insert into t2 (id, a) values (11, 1) on duplicate key update a = 2, name = 'b'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 2 21 b")) + tk.MustExec("insert into t2 (id, a, b) values (11, 2, 22) on duplicate key update a = 3, name = 'c'") + tk.MustExec("insert into t2 (id, a, name) values (11, 3, 'b') on duplicate key update b = b+10, name = 'd'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 3 31 d")) + tk.MustExec("insert into t2 (id, a, name) values (11, 3, 'b') on duplicate key update id = 1, name = 'f'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 31 f")) + tk.MustGetDBError("insert into t2 (id, a, name) values (1, 3, 'b') on duplicate key update a = 10", plannercore.ErrNoReferencedRow2) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 31 f")) + + // Test In txn. + tk.MustExec("delete from t2") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (1, 1, 21, 'a')") + tk.MustExec("begin") + tk.MustExec("insert into t2 (id, a) values (11, 1) on duplicate key update a = 2, name = 'b'") + tk.MustExec("rollback") + + tk.MustExec("begin") + tk.MustExec("delete from t1 where id=2") + tk.MustGetDBError("insert into t2 (id, a) values (1, 1) on duplicate key update a = 2, name = 'b'", plannercore.ErrNoReferencedRow2) + tk.MustExec("insert into t2 (id, a) values (1, 1) on duplicate key update a = 3, name = 'c'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 21 c")) + tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)") + tk.MustExec("insert into t2 (id, a) values (3, 3) on duplicate key update a = 5, name = 'd'") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 5 21 d")) + tk.MustExec("delete from t1 where id=1") + tk.MustGetDBError("insert into t2 (id, a) values (1, 5) on duplicate key update a = 1, name = 'e'", plannercore.ErrNoReferencedRow2) + tk.MustExec("commit") + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 5 21 d")) +} + +func TestForeignKeyCheckAndLock(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("set @@foreign_key_checks=1") + tk2.MustExec("use test") + + cases := []struct { + prepareSQLs []string + }{ + // Case-1: test unique index only contain foreign key columns. + { + prepareSQLs: []string{ + "create table t1 (id int, name varchar(10), unique index (id))", + "create table t2 (a int, name varchar(10), unique index (a), foreign key fk(a) references t1(id))", + }, + }, + //Case-2: test unique index contain foreign key columns and other columns. + { + prepareSQLs: []string{ + "create table t1 (id int, name varchar(10), unique index (id, name))", + "create table t2 (name varchar(10), a int, unique index (a, name), foreign key fk(a) references t1(id))", + }, + }, + //Case-3: test non-unique index only contain foreign key columns. + { + prepareSQLs: []string{ + "create table t1 (id int, name varchar(10), index (id))", + "create table t2 (a int, name varchar(10), index (a), foreign key fk(a) references t1(id))", + }, + }, + //Case-4: test non-unique index contain foreign key columns and other columns. + { + prepareSQLs: []string{ + "create table t1 (id int, name varchar(10), index (id, name))", + "create table t2 (name varchar(10), a int, index (a, name), foreign key fk(a) references t1(id))", + }, + }, + //Case-5: test primary key only contain foreign key columns, and disable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=0;", + "create table t1 (id int, name varchar(10), primary key (id))", + "create table t2 (a int, name varchar(10), primary key (a), foreign key fk(a) references t1(id))", + }, + }, + //Case-6: test primary key only contain foreign key columns, and enable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=1;", + "create table t1 (id int, name varchar(10), primary key (id))", + "create table t2 (a int, name varchar(10), primary key (a), foreign key fk(a) references t1(id))", + }, + }, + //Case-7: test primary key contain foreign key columns and other column, and disable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=0;", + "create table t1 (id int, name varchar(10), primary key (id, name))", + "create table t2 (a int, name varchar(10), primary key (a , name), foreign key fk(a) references t1(id))", + }, + }, + // Case-8: test primary key contain foreign key columns and other column, and enable tidb_enable_clustered_index. + { + prepareSQLs: []string{ + "set @@tidb_enable_clustered_index=1;", + "create table t1 (id int, name varchar(10), primary key (id, name))", + "create table t2 (a int, name varchar(10), primary key (a , name), foreign key fk(a) references t1(id))", + }, + }, + } + + for _, ca := range cases { + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + for _, sql := range ca.prepareSQLs { + tk.MustExec(sql) + } + // Test in optimistic txn + tk.MustExec("insert into t1 (id, name) values (1, 'a');") + // Test insert child table + tk.MustExec("begin optimistic") + tk.MustExec("insert into t2 (a, name) values (1, 'a');") + tk2.MustExec("delete from t1 where id = 1") + err := tk.ExecToErr("commit") + require.Error(t, err) + require.Contains(t, err.Error(), "Write conflict") + } +} + +func TestForeignKeyOnInsertIgnore(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + tk.MustExec("CREATE TABLE t1 (i INT PRIMARY KEY);") + tk.MustExec("CREATE TABLE t2 (i INT, FOREIGN KEY (i) REFERENCES t1 (i));") + tk.MustExec("INSERT INTO t1 VALUES (1),(3);") + tk.MustExec("INSERT IGNORE INTO t2 VALUES (1),(2),(3),(4);") + warning := "Warning 1452 Cannot add or update a child row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk_1` FOREIGN KEY (`i`) REFERENCES `t1` (`i`))" + tk.MustQuery("show warnings;").Check(testkit.Rows(warning, warning)) + tk.MustQuery("select * from t2").Check(testkit.Rows("1", "3")) +} + +func TestForeignKeyOnInsertOnDuplicateParentTableCheck(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + for _, ca := range foreignKeyTestCase1 { + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + for _, sql := range ca.prepareSQLs { + tk.MustExec(sql) + } + if !ca.notNull { + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24), (5, 15, null), (6, null, 26), (7, null, null);") + tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a'), (5, 15, null, 'e'), (6, null, 26, 'f'), (7, null, null, 'g');") + + tk.MustExec("insert into t1 (id, a) values (2, 12) on duplicate key update a=a+100, b=b+200") + tk.MustExec("insert into t1 (id, a) values (3, 13), (2, 12) on duplicate key update a=a+1000, b=b+2000") + tk.MustExec("insert into t1 (id) values (5), (6), (7) on duplicate key update a=a+10000, b=b+20000") + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 ", "6 20026", "7 ")) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 e", "6 26 f", "7 g")) + + tk.MustGetDBError("insert into t1 (id, a) values (1, 11) on duplicate key update a=a+10, b=b+20", plannercore.ErrRowIsReferenced2) + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 ", "6 20026", "7 ")) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 e", "6 26 f", "7 g")) + } else { + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a');") + + tk.MustExec("insert into t1 (id, a, b) values (2, 12, 22) on duplicate key update a=a+100, b=b+200") + tk.MustExec("insert into t1 (id, a, b) values (3, 13, 23), (2, 12, 22) on duplicate key update a=a+1000, b=b+2000") + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24")) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a")) + + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21) on duplicate key update id=11") + tk.MustGetDBError("insert into t1 (id, a, b) values (1, 11, 21) on duplicate key update a=a+10, b=b+20", plannercore.ErrRowIsReferenced2) + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("2 1112 2222", "3 1013 2023", "4 14 24", "11 11 21")) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a")) + } + } + + // Case-9: test primary key is handle and contain foreign key column. + tk.MustExec("drop table if exists t2;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("set @@tidb_enable_clustered_index=0;") + tk.MustExec("create table t1 (id int, a int, b int, primary key (id));") + tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));") + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)") + tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')") + + tk.MustExec("insert into t1 (id, a, b) values (2, 0, 0), (3, 0, 0) on duplicate key update id=id+100") + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "4 14 24", "102 12 22", "103 13 23")) + + tk.MustExec("insert into t1 (id, a, b) values (1, 0, 0) on duplicate key update a=a+100") + tk.MustGetDBError("insert into t1 (id, a, b) values (1, 0, 0) on duplicate key update id=100+id", plannercore.ErrRowIsReferenced2) + tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 111 21", "4 14 24", "102 12 22", "103 13 23")) + tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 1 21 a")) +} + +func TestForeignKey(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + + // Test table has more than 1 foreign keys. + tk.MustExec("create table t1 (id int, a int, b int, primary key (id));") + tk.MustExec("create table t2 (id int, a int, b int, primary key (id));") + tk.MustExec("create table t3 (b int, a int, id int, primary key (a), foreign key (a) references t1(id), foreign key (b) references t2(id));") + tk.MustExec("insert into t1 (id, a, b) values (1, 11, 111), (2, 22, 222);") + tk.MustExec("insert into t2 (id, a, b) values (2, 22, 222);") + tk.MustGetDBError("insert into t3 (id, a, b) values (1, 1, 1)", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("insert into t3 (id, a, b) values (2, 3, 2)", plannercore.ErrNoReferencedRow2) +} + +func TestForeignKeyConcurrentInsertChildTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + tk.MustExec("create table t1 (id int, a int, primary key (id));") + tk.MustExec("create table t2 (id int, a int, index(a), foreign key fk(a) references t1(id));") + tk.MustExec("insert into t1 (id, a) values (1, 11),(2, 12), (3, 13), (4, 14)") + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_foreign_key=1") + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + for cnt := 0; cnt < 20; cnt++ { + id := cnt%4 + 1 + sql := fmt.Sprintf("insert into t2 (id, a) values (%v, %v)", cnt, id) + tk.MustExec(sql) + } + }() + } + wg.Wait() +} diff --git a/executor/insert.go b/executor/insert.go index 7c4b84f5f1..29c0231ed4 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -420,7 +420,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo } newData := e.row4Update[:len(oldRow)] - _, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker) + _, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks) if err != nil { return err } @@ -448,3 +448,8 @@ func (e *InsertExec) setMessage() { stmtCtx.SetMessage(msg) } } + +// GetFKChecks implements WithForeignKeyTrigger interface. +func (e *InsertExec) GetFKChecks() []*FKCheckExec { + return e.fkChecks +} diff --git a/executor/insert_common.go b/executor/insert_common.go index daeff19050..d708a59770 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -97,6 +97,8 @@ type InsertValues struct { // We use mutex to protect routine from using invalid txn. isLoadData bool txnInUse sync.Mutex + // fkChecks contains the foreign key checkers. + fkChecks []*FKCheckExec } type defaultVal struct { @@ -1126,6 +1128,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D defer snapshot.SetOption(kv.CollectRuntimeStats, nil) } } + sc := e.ctx.GetSessionVars().StmtCtx + for _, fkc := range e.fkChecks { + err = fkc.checkRows(ctx, sc, txn, toBeCheckedRows) + if err != nil { + return err + } + } prefetchStart := time.Now() // Fill cache using BatchGet, the following Get requests don't need to visit TiKV. // Temporary table need not to do prefetch because its all data are stored in the memory. @@ -1265,6 +1274,14 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types. if e.lastInsertID != 0 { vars.SetLastInsertID(e.lastInsertID) } + if !vars.StmtCtx.BatchCheck { + for _, fkc := range e.fkChecks { + err = fkc.insertRowNeedToCheck(vars.StmtCtx, row) + if err != nil { + return err + } + } + } return nil } diff --git a/executor/update.go b/executor/update.go index e8c65d18f9..463bf92a40 100644 --- a/executor/update.go +++ b/executor/update.go @@ -191,7 +191,7 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n flags := bAssignFlag[content.Start:content.End] // Update row - changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker) + changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, nil) if err1 == nil { _, exist := e.updatedRowKeys[content.Start].Get(handle) memDelta := e.updatedRowKeys[content.Start].Set(handle, changed) diff --git a/executor/write.go b/executor/write.go index c6b30e7704..8d4336b07b 100644 --- a/executor/write.go +++ b/executor/write.go @@ -51,7 +51,7 @@ var ( // 1. changed (bool) : does the update really change the row values. e.g. update set i = 1 where i = 1; // 2. err (error) : error in the update. func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table, - onDup bool, memTracker *memory.Tracker) (bool, error) { + onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec) (bool, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -214,6 +214,12 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old return false, err } } + for _, fkt := range fkChecks { + err := fkt.updateRowNeedToCheck(sc, oldData, newData) + if err != nil { + return false, err + } + } if onDup { sc.AddAffectedRows(2) } else { diff --git a/kv/option.go b/kv/option.go index 5ecf120802..888a1e24f0 100644 --- a/kv/option.go +++ b/kv/option.go @@ -91,6 +91,8 @@ const ( RequestSourceType // ReplicaReadAdjuster set the adjust function of cop requsts. ReplicaReadAdjuster + // ScanBatchSize set the iter scan batch size. + ScanBatchSize ) // ReplicaReadType is the type of replica to read data from diff --git a/parser/model/model.go b/parser/model/model.go index 75ae852036..34f16a72a6 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1566,7 +1566,7 @@ func (fk *FKInfo) String(db, tb string) string { buf.WriteString(fk.Name.O + "` FOREIGN KEY (") for i, col := range fk.Cols { if i > 0 { - buf.WriteByte(byte(',')) + buf.WriteString(", ") } buf.WriteString("`" + col.O + "`") } @@ -1579,7 +1579,7 @@ func (fk *FKInfo) String(db, tb string) string { buf.WriteString("` (") for i, col := range fk.RefCols { if i > 0 { - buf.WriteByte(byte(',')) + buf.WriteString(", ") } buf.WriteString("`" + col.O + "`") } diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 0518ae23a7..988f4f54c8 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -323,6 +323,8 @@ type Insert struct { AllAssignmentsAreConstant bool RowLen int + + FKChecks []*FKCheck } // Update represents Update plan. diff --git a/planner/core/errors.go b/planner/core/errors.go index bf7db49e10..2662ae726f 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -111,8 +111,10 @@ var ( ErrKeyPart0 = dbterror.ClassOptimizer.NewStd(mysql.ErrKeyPart0) ErrGettingNoopVariable = dbterror.ClassOptimizer.NewStd(mysql.ErrGettingNoopVariable) - ErrPrepareMulti = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareMulti) - ErrUnsupportedPs = dbterror.ClassExecutor.NewStd(mysql.ErrUnsupportedPs) - ErrPsManyParam = dbterror.ClassExecutor.NewStd(mysql.ErrPsManyParam) - ErrPrepareDDL = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareDDL) + ErrPrepareMulti = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareMulti) + ErrUnsupportedPs = dbterror.ClassExecutor.NewStd(mysql.ErrUnsupportedPs) + ErrPsManyParam = dbterror.ClassExecutor.NewStd(mysql.ErrPsManyParam) + ErrPrepareDDL = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareDDL) + ErrRowIsReferenced2 = dbterror.ClassOptimizer.NewStd(mysql.ErrRowIsReferenced2) + ErrNoReferencedRow2 = dbterror.ClassOptimizer.NewStd(mysql.ErrNoReferencedRow2) ) diff --git a/planner/core/foreign_key.go b/planner/core/foreign_key.go new file mode 100644 index 0000000000..5cb7305a74 --- /dev/null +++ b/planner/core/foreign_key.go @@ -0,0 +1,176 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" +) + +// FKCheck indicates the foreign key constraint checker. +type FKCheck struct { + FK *model.FKInfo + ReferredFK *model.ReferredFKInfo + Tbl table.Table + Idx table.Index + Cols []model.CIStr + + IdxIsPrimaryKey bool + IdxIsExclusive bool + + CheckExist bool + FailedErr error +} + +func (p *Insert) buildOnInsertFKChecks(ctx sessionctx.Context, is infoschema.InfoSchema, dbName string) ([]*FKCheck, error) { + if !ctx.GetSessionVars().ForeignKeyChecks { + return nil, nil + } + tblInfo := p.Table.Meta() + fkChecks := make([]*FKCheck, 0, len(tblInfo.ForeignKeys)) + updateCols := p.buildOnDuplicateUpdateColumns() + if len(updateCols) > 0 { + referredFKChecks, err := buildOnUpdateReferredFKChecks(is, dbName, tblInfo, updateCols) + if err != nil { + return nil, err + } + if len(referredFKChecks) > 0 { + fkChecks = append(fkChecks, referredFKChecks...) + } + } + for _, fk := range tblInfo.ForeignKeys { + if fk.Version < 1 { + continue + } + failedErr := ErrNoReferencedRow2.FastGenByArgs(fk.String(dbName, tblInfo.Name.L)) + fkCheck, err := buildFKCheckOnModifyChildTable(is, fk, failedErr) + if err != nil { + return nil, err + } + if fkCheck != nil { + fkChecks = append(fkChecks, fkCheck) + } + } + return fkChecks, nil +} + +func (p *Insert) buildOnDuplicateUpdateColumns() map[string]struct{} { + m := make(map[string]struct{}) + for _, assign := range p.OnDuplicate { + m[assign.ColName.L] = struct{}{} + } + return m +} + +func buildOnUpdateReferredFKChecks(is infoschema.InfoSchema, dbName string, tblInfo *model.TableInfo, updateCols map[string]struct{}) ([]*FKCheck, error) { + referredFKs := is.GetTableReferredForeignKeys(dbName, tblInfo.Name.L) + fkChecks := make([]*FKCheck, 0, len(referredFKs)) + for _, referredFK := range referredFKs { + if !isMapContainAnyCols(updateCols, referredFK.Cols...) { + continue + } + fkCheck, err := buildFKCheckOnModifyReferTable(is, referredFK) + if err != nil { + return nil, err + } + if fkCheck != nil { + fkChecks = append(fkChecks, fkCheck) + } + } + return fkChecks, nil +} + +func isMapContainAnyCols(colsMap map[string]struct{}, cols ...model.CIStr) bool { + for _, col := range cols { + _, exist := colsMap[col.L] + if exist { + return true + } + } + return false +} + +func buildFKCheckOnModifyChildTable(is infoschema.InfoSchema, fk *model.FKInfo, failedErr error) (*FKCheck, error) { + referTable, err := is.TableByName(fk.RefSchema, fk.RefTable) + if err != nil { + return nil, nil + } + fkCheck, err := buildFKCheck(referTable, fk.RefCols, failedErr) + if err != nil { + return nil, err + } + fkCheck.CheckExist = true + fkCheck.FK = fk + return fkCheck, nil +} + +func buildFKCheckOnModifyReferTable(is infoschema.InfoSchema, referredFK *model.ReferredFKInfo) (*FKCheck, error) { + childTable, err := is.TableByName(referredFK.ChildSchema, referredFK.ChildTable) + if err != nil { + return nil, nil + } + fk := model.FindFKInfoByName(childTable.Meta().ForeignKeys, referredFK.ChildFKName.L) + if fk == nil || fk.Version < 1 { + return nil, nil + } + failedErr := ErrRowIsReferenced2.GenWithStackByArgs(fk.String(referredFK.ChildSchema.L, referredFK.ChildTable.L)) + fkCheck, err := buildFKCheck(childTable, fk.Cols, failedErr) + if err != nil { + return nil, err + } + fkCheck.CheckExist = false + fkCheck.ReferredFK = referredFK + return fkCheck, nil +} + +func buildFKCheck(tbl table.Table, cols []model.CIStr, failedErr error) (*FKCheck, error) { + tblInfo := tbl.Meta() + if tblInfo.PKIsHandle && len(cols) == 1 { + refColInfo := model.FindColumnInfo(tblInfo.Columns, cols[0].L) + if refColInfo != nil && mysql.HasPriKeyFlag(refColInfo.GetFlag()) { + return &FKCheck{ + Tbl: tbl, + IdxIsPrimaryKey: true, + IdxIsExclusive: true, + FailedErr: failedErr, + }, nil + } + } + + referTbIdxInfo := model.FindIndexByColumns(tblInfo, cols...) + if referTbIdxInfo == nil { + return nil, failedErr + } + var tblIdx table.Index + for _, idx := range tbl.Indices() { + if idx.Meta().ID == referTbIdxInfo.ID { + tblIdx = idx + } + } + if tblIdx == nil { + return nil, failedErr + } + + return &FKCheck{ + Tbl: tbl, + Idx: tblIdx, + IdxIsExclusive: len(cols) == len(referTbIdxInfo.Columns), + IdxIsPrimaryKey: referTbIdxInfo.Primary && tblInfo.IsCommonHandle, + FailedErr: failedErr, + }, nil +} diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index dde78645bb..888326c0d1 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3590,6 +3590,10 @@ func (b *PlanBuilder) buildInsert(ctx context.Context, insert *ast.InsertStmt) ( } err = insertPlan.ResolveIndices() + if err != nil { + return nil, err + } + insertPlan.FKChecks, err = insertPlan.buildOnInsertFKChecks(b.ctx, b.is, tn.DBInfo.Name.L) return insertPlan, err } diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index ae84351e56..66c82f86ea 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -130,6 +130,11 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { s.KVSnapshot.SetRequestSourceType(val.(string)) case kv.ReplicaReadAdjuster: s.KVSnapshot.SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster)) + case kv.ScanBatchSize: + size := val.(int) + if size > 0 { + s.KVSnapshot.SetScanBatchSize(size) + } } }