*: add foreign key constraint check when execute insert statement (#37466)
close pingcap/tidb#37465
This commit is contained in:
10
errors.toml
10
errors.toml
@ -1936,6 +1936,16 @@ error = '''
|
||||
Key part '%-.192s' length cannot be 0
|
||||
'''
|
||||
|
||||
["planner:1451"]
|
||||
error = '''
|
||||
Cannot delete or update a parent row: a foreign key constraint fails (%.192s)
|
||||
'''
|
||||
|
||||
["planner:1452"]
|
||||
error = '''
|
||||
Cannot add or update a child row: a foreign key constraint fails (%.192s)
|
||||
'''
|
||||
|
||||
["planner:1462"]
|
||||
error = '''
|
||||
`%-.192s`.`%-.192s` contains view recursion
|
||||
|
||||
@ -536,6 +536,10 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
|
||||
}
|
||||
|
||||
if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled || err != nil {
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
err = a.handleForeignKeyTrigger(ctx, e)
|
||||
return result, err
|
||||
}
|
||||
|
||||
@ -555,6 +559,21 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor) error {
|
||||
exec, ok := e.(WithForeignKeyTrigger)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
fkChecks := exec.GetFKChecks()
|
||||
for _, fkCheck := range fkChecks {
|
||||
err := fkCheck.doCheck(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic bool) (handled bool, rs sqlexec.RecordSet, err error) {
|
||||
sc := a.Ctx.GetSessionVars().StmtCtx
|
||||
defer func() {
|
||||
|
||||
@ -888,6 +888,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
|
||||
b.err = err
|
||||
return nil
|
||||
}
|
||||
ivs.fkChecks, err = buildFKCheckExecs(b.ctx, ivs.Table, v.FKChecks)
|
||||
if err != nil {
|
||||
b.err = err
|
||||
return nil
|
||||
}
|
||||
|
||||
if v.IsReplace {
|
||||
return b.buildReplace(ivs)
|
||||
|
||||
489
executor/foreign_key.go
Normal file
489
executor/foreign_key.go
Normal file
@ -0,0 +1,489 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
plannercore "github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/pingcap/tidb/util/set"
|
||||
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
|
||||
)
|
||||
|
||||
// WithForeignKeyTrigger indicates the executor has foreign key check or cascade.
|
||||
type WithForeignKeyTrigger interface {
|
||||
GetFKChecks() []*FKCheckExec
|
||||
}
|
||||
|
||||
// FKCheckExec uses to check foreign key constraint.
|
||||
// When insert/update child table, need to check the row has related row exists in refer table.
|
||||
// When insert/update parent table, need to check the row doesn't have related row exists in refer table.
|
||||
type FKCheckExec struct {
|
||||
*plannercore.FKCheck
|
||||
*fkValueHelper
|
||||
ctx sessionctx.Context
|
||||
|
||||
toBeCheckedKeys []kv.Key
|
||||
toBeCheckedPrefixKeys []kv.Key
|
||||
toBeLockedKeys []kv.Key
|
||||
|
||||
checkRowsCache map[string]bool
|
||||
}
|
||||
|
||||
func buildFKCheckExecs(sctx sessionctx.Context, tbl table.Table, fkChecks []*plannercore.FKCheck) ([]*FKCheckExec, error) {
|
||||
fkCheckExecs := make([]*FKCheckExec, 0, len(fkChecks))
|
||||
for _, fkCheck := range fkChecks {
|
||||
fkCheckExec, err := buildFKCheckExec(sctx, tbl, fkCheck)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fkCheckExec != nil {
|
||||
fkCheckExecs = append(fkCheckExecs, fkCheckExec)
|
||||
}
|
||||
}
|
||||
return fkCheckExecs, nil
|
||||
}
|
||||
|
||||
func buildFKCheckExec(sctx sessionctx.Context, tbl table.Table, fkCheck *plannercore.FKCheck) (*FKCheckExec, error) {
|
||||
var cols []model.CIStr
|
||||
if fkCheck.FK != nil {
|
||||
cols = fkCheck.FK.Cols
|
||||
} else if fkCheck.ReferredFK != nil {
|
||||
cols = fkCheck.ReferredFK.Cols
|
||||
}
|
||||
colsOffsets, err := getFKColumnsOffsets(tbl.Meta(), cols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
helper := &fkValueHelper{
|
||||
colsOffsets: colsOffsets,
|
||||
fkValuesSet: set.NewStringSet(),
|
||||
}
|
||||
return &FKCheckExec{
|
||||
ctx: sctx,
|
||||
FKCheck: fkCheck,
|
||||
fkValueHelper: helper,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) insertRowNeedToCheck(sc *stmtctx.StatementContext, row []types.Datum) error {
|
||||
return fkc.addRowNeedToCheck(sc, row)
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) updateRowNeedToCheck(sc *stmtctx.StatementContext, oldRow, newRow []types.Datum) error {
|
||||
if fkc.FK != nil {
|
||||
return fkc.addRowNeedToCheck(sc, newRow)
|
||||
} else if fkc.ReferredFK != nil {
|
||||
return fkc.addRowNeedToCheck(sc, oldRow)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) addRowNeedToCheck(sc *stmtctx.StatementContext, row []types.Datum) error {
|
||||
vals, err := fkc.fetchFKValuesWithCheck(sc, row)
|
||||
if err != nil || len(vals) == 0 {
|
||||
return err
|
||||
}
|
||||
key, isPrefix, err := fkc.buildCheckKeyFromFKValue(sc, vals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isPrefix {
|
||||
fkc.toBeCheckedPrefixKeys = append(fkc.toBeCheckedPrefixKeys, key)
|
||||
} else {
|
||||
fkc.toBeCheckedKeys = append(fkc.toBeCheckedKeys, key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
|
||||
txn, err := fkc.ctx.Txn(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fkc.checkKeys(ctx, txn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fkc.checkIndexKeys(ctx, txn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(fkc.toBeLockedKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
sessVars := fkc.ctx.GetSessionVars()
|
||||
lockCtx, err := newLockCtx(fkc.ctx, sessVars.LockWaitTimeout, len(fkc.toBeLockedKeys))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// WARN: Since tidb current doesn't support `LOCK IN SHARE MODE`, therefore, performance will be very poor in concurrency cases.
|
||||
// TODO(crazycs520):After TiDB support `LOCK IN SHARE MODE`, use `LOCK IN SHARE MODE` here.
|
||||
forUpdate := atomic.LoadUint32(&sessVars.TxnCtx.ForUpdate)
|
||||
err = doLockKeys(ctx, fkc.ctx, lockCtx, fkc.toBeLockedKeys...)
|
||||
// doLockKeys may set TxnCtx.ForUpdate to 1, then if the lock meet write conflict, TiDB can't retry for update.
|
||||
// So reset TxnCtx.ForUpdate to 0 then can be retry if meet write conflict.
|
||||
atomic.StoreUint32(&sessVars.TxnCtx.ForUpdate, forUpdate)
|
||||
return err
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) buildCheckKeyFromFKValue(sc *stmtctx.StatementContext, vals []types.Datum) (key kv.Key, isPrefix bool, err error) {
|
||||
if fkc.IdxIsPrimaryKey {
|
||||
handleKey, err := fkc.buildHandleFromFKValues(sc, vals)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
key := tablecodec.EncodeRecordKey(fkc.Tbl.RecordPrefix(), handleKey)
|
||||
if fkc.IdxIsExclusive {
|
||||
return key, false, nil
|
||||
}
|
||||
return key, true, nil
|
||||
}
|
||||
key, distinct, err := fkc.Idx.GenIndexKey(sc, vals, nil, nil)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if distinct && fkc.IdxIsExclusive {
|
||||
return key, false, nil
|
||||
}
|
||||
return key, true, nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) buildHandleFromFKValues(sc *stmtctx.StatementContext, vals []types.Datum) (kv.Handle, error) {
|
||||
if len(vals) == 1 && fkc.Idx == nil {
|
||||
return kv.IntHandle(vals[0].GetInt64()), nil
|
||||
}
|
||||
handleBytes, err := codec.EncodeKey(sc, nil, vals...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return kv.NewCommonHandle(handleBytes)
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkKeys(ctx context.Context, txn kv.Transaction) error {
|
||||
if len(fkc.toBeCheckedKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
err := fkc.prefetchKeys(ctx, txn, fkc.toBeCheckedKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, k := range fkc.toBeCheckedKeys {
|
||||
err = fkc.checkKey(ctx, txn, k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) prefetchKeys(ctx context.Context, txn kv.Transaction, keys []kv.Key) error {
|
||||
// Fill cache using BatchGet
|
||||
_, err := txn.BatchGet(ctx, keys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkKey(ctx context.Context, txn kv.Transaction, k kv.Key) error {
|
||||
if fkc.CheckExist {
|
||||
return fkc.checkKeyExist(ctx, txn, k)
|
||||
}
|
||||
return fkc.checkKeyNotExist(ctx, txn, k)
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkKeyExist(ctx context.Context, txn kv.Transaction, k kv.Key) error {
|
||||
_, err := txn.Get(ctx, k)
|
||||
if err == nil {
|
||||
fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, k)
|
||||
return nil
|
||||
}
|
||||
if kv.IsErrNotFound(err) {
|
||||
return fkc.FailedErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkKeyNotExist(ctx context.Context, txn kv.Transaction, k kv.Key) error {
|
||||
_, err := txn.Get(ctx, k)
|
||||
if err == nil {
|
||||
return fkc.FailedErr
|
||||
}
|
||||
if kv.IsErrNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkIndexKeys(ctx context.Context, txn kv.Transaction) error {
|
||||
if len(fkc.toBeCheckedPrefixKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
memBuffer := txn.GetMemBuffer()
|
||||
snap := txn.GetSnapshot()
|
||||
snap.SetOption(kv.ScanBatchSize, 2)
|
||||
defer func() {
|
||||
snap.SetOption(kv.ScanBatchSize, txnsnapshot.DefaultScanBatchSize)
|
||||
}()
|
||||
for _, key := range fkc.toBeCheckedPrefixKeys {
|
||||
err := fkc.checkPrefixKey(ctx, memBuffer, snap, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkPrefixKey(ctx context.Context, memBuffer kv.MemBuffer, snap kv.Snapshot, key kv.Key) error {
|
||||
key, value, err := fkc.getIndexKeyValueInTable(ctx, memBuffer, snap, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fkc.CheckExist {
|
||||
return fkc.checkPrefixKeyExist(key, value)
|
||||
}
|
||||
if len(value) > 0 {
|
||||
// If check not exist, but the key is exist, return failedErr.
|
||||
return fkc.FailedErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) checkPrefixKeyExist(key kv.Key, value []byte) error {
|
||||
exist := len(value) > 0
|
||||
if !exist {
|
||||
return fkc.FailedErr
|
||||
}
|
||||
if fkc.Idx != nil && fkc.Idx.Meta().Primary && fkc.Tbl.Meta().IsCommonHandle {
|
||||
fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, key)
|
||||
} else {
|
||||
handle, err := tablecodec.DecodeIndexHandle(key, value, len(fkc.Idx.Meta().Columns))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
handleKey := tablecodec.EncodeRecordKey(fkc.Tbl.RecordPrefix(), handle)
|
||||
fkc.toBeLockedKeys = append(fkc.toBeLockedKeys, handleKey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkc *FKCheckExec) getIndexKeyValueInTable(ctx context.Context, memBuffer kv.MemBuffer, snap kv.Snapshot, key kv.Key) (k []byte, v []byte, _ error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
memIter, err := memBuffer.Iter(key, key.PrefixNext())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
deletedKeys := set.NewStringSet()
|
||||
defer memIter.Close()
|
||||
for ; memIter.Valid(); err = memIter.Next() {
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
k := memIter.Key()
|
||||
if !k.HasPrefix(key) {
|
||||
break
|
||||
}
|
||||
// check whether the key was been deleted.
|
||||
if len(memIter.Value()) > 0 {
|
||||
return k, memIter.Value(), nil
|
||||
}
|
||||
deletedKeys.Insert(string(k))
|
||||
}
|
||||
|
||||
it, err := snap.Iter(key, key.PrefixNext())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer it.Close()
|
||||
for ; it.Valid(); err = it.Next() {
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
k := it.Key()
|
||||
if !k.HasPrefix(key) {
|
||||
break
|
||||
}
|
||||
if !deletedKeys.Exist(string(k)) {
|
||||
return k, it.Value(), nil
|
||||
}
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
type fkValueHelper struct {
|
||||
colsOffsets []int
|
||||
fkValuesSet set.StringSet
|
||||
}
|
||||
|
||||
func (h *fkValueHelper) fetchFKValuesWithCheck(sc *stmtctx.StatementContext, row []types.Datum) ([]types.Datum, error) {
|
||||
vals, err := h.fetchFKValues(row)
|
||||
if err != nil || h.hasNullValue(vals) {
|
||||
return nil, err
|
||||
}
|
||||
keyBuf, err := codec.EncodeKey(sc, nil, vals...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key := string(keyBuf)
|
||||
if h.fkValuesSet.Exist(key) {
|
||||
return nil, nil
|
||||
}
|
||||
h.fkValuesSet.Insert(key)
|
||||
return vals, nil
|
||||
}
|
||||
|
||||
func (h *fkValueHelper) fetchFKValues(row []types.Datum) ([]types.Datum, error) {
|
||||
vals := make([]types.Datum, len(h.colsOffsets))
|
||||
for i, offset := range h.colsOffsets {
|
||||
if offset >= len(row) {
|
||||
return nil, table.ErrIndexOutBound.GenWithStackByArgs("", offset, row)
|
||||
}
|
||||
vals[i] = row[offset]
|
||||
}
|
||||
return vals, nil
|
||||
}
|
||||
|
||||
func (h *fkValueHelper) hasNullValue(vals []types.Datum) bool {
|
||||
// If any foreign key column value is null, no need to check this row.
|
||||
// test case:
|
||||
// create table t1 (id int key,a int, b int, index(a, b));
|
||||
// create table t2 (id int key,a int, b int, foreign key fk(a, b) references t1(a, b) ON DELETE CASCADE);
|
||||
// > insert into t2 values (2, null, 1);
|
||||
// Query OK, 1 row affected
|
||||
// > insert into t2 values (3, 1, null);
|
||||
// Query OK, 1 row affected
|
||||
// > insert into t2 values (4, null, null);
|
||||
// Query OK, 1 row affected
|
||||
// > select * from t2;
|
||||
// +----+--------+--------+
|
||||
// | id | a | b |
|
||||
// +----+--------+--------+
|
||||
// | 4 | <null> | <null> |
|
||||
// | 2 | <null> | 1 |
|
||||
// | 3 | 1 | <null> |
|
||||
// +----+--------+--------+
|
||||
for _, val := range vals {
|
||||
if val.IsNull() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getFKColumnsOffsets(tbInfo *model.TableInfo, cols []model.CIStr) ([]int, error) {
|
||||
colsOffsets := make([]int, len(cols))
|
||||
for i, col := range cols {
|
||||
offset := -1
|
||||
for i := range tbInfo.Columns {
|
||||
if tbInfo.Columns[i].Name.L == col.L {
|
||||
offset = tbInfo.Columns[i].Offset
|
||||
break
|
||||
}
|
||||
}
|
||||
if offset < 0 {
|
||||
return nil, table.ErrUnknownColumn.GenWithStackByArgs(col.L)
|
||||
}
|
||||
colsOffsets[i] = offset
|
||||
}
|
||||
return colsOffsets, nil
|
||||
}
|
||||
|
||||
type fkCheckKey struct {
|
||||
k kv.Key
|
||||
isPrefix bool
|
||||
}
|
||||
|
||||
func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
|
||||
if len(rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
if fkc.checkRowsCache == nil {
|
||||
fkc.checkRowsCache = map[string]bool{}
|
||||
}
|
||||
fkCheckKeys := make([]*fkCheckKey, len(rows))
|
||||
prefetchKeys := make([]kv.Key, 0, len(rows))
|
||||
for i, r := range rows {
|
||||
if r.ignored {
|
||||
continue
|
||||
}
|
||||
vals, err := fkc.fetchFKValues(r.row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fkc.hasNullValue(vals) {
|
||||
continue
|
||||
}
|
||||
key, isPrefix, err := fkc.buildCheckKeyFromFKValue(sc, vals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fkCheckKeys[i] = &fkCheckKey{key, isPrefix}
|
||||
if !isPrefix {
|
||||
prefetchKeys = append(prefetchKeys, key)
|
||||
}
|
||||
}
|
||||
if len(prefetchKeys) > 0 {
|
||||
err := fkc.prefetchKeys(ctx, txn, prefetchKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
memBuffer := txn.GetMemBuffer()
|
||||
snap := txn.GetSnapshot()
|
||||
snap.SetOption(kv.ScanBatchSize, 2)
|
||||
defer func() {
|
||||
snap.SetOption(kv.ScanBatchSize, 256)
|
||||
}()
|
||||
for i, fkCheckKey := range fkCheckKeys {
|
||||
if fkCheckKey == nil {
|
||||
continue
|
||||
}
|
||||
k := fkCheckKey.k
|
||||
if ignore, ok := fkc.checkRowsCache[string(k)]; ok {
|
||||
if ignore {
|
||||
rows[i].ignored = true
|
||||
sc.AppendWarning(fkc.FailedErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
if fkCheckKey.isPrefix {
|
||||
err = fkc.checkPrefixKey(ctx, memBuffer, snap, k)
|
||||
} else {
|
||||
err = fkc.checkKey(ctx, txn, k)
|
||||
}
|
||||
if err != nil {
|
||||
rows[i].ignored = true
|
||||
sc.AppendWarning(fkc.FailedErr)
|
||||
fkc.checkRowsCache[string(k)] = true
|
||||
}
|
||||
fkc.checkRowsCache[string(k)] = false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
474
executor/foreign_key_test.go
Normal file
474
executor/foreign_key_test.go
Normal file
@ -0,0 +1,474 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package executor_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
plannercore "github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/testkit"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var foreignKeyTestCase1 = []struct {
|
||||
prepareSQLs []string
|
||||
notNull bool
|
||||
}{
|
||||
// Case-1: test unique index only contain foreign key columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int, a int, b int, unique index(id), unique index(a, b));",
|
||||
"create table t2 (b int, name varchar(10), a int, id int, unique index(id), unique index (a,b), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
},
|
||||
// Case-2: test unique index contain foreign key columns and other columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int key, a int, b int, unique index(id), unique index(a, b, id));",
|
||||
"create table t2 (b int, a int, id int key, name varchar(10), unique index (a,b, id), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
},
|
||||
// Case-3: test non-unique index only contain foreign key columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int key,a int, b int, unique index(id), index(a, b));",
|
||||
"create table t2 (b int, a int, name varchar(10), id int key, index (a, b), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
},
|
||||
// Case-4: test non-unique index contain foreign key columns and other columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int key,a int, b int, unique index(id), index(a, b, id));",
|
||||
"create table t2 (name varchar(10), b int, a int, id int key, index (a, b, id), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
},
|
||||
//Case-5: test primary key only contain foreign key columns, and disable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=0;",
|
||||
"create table t1 (id int, a int, b int, unique index(id), primary key (a, b));",
|
||||
"create table t2 (b int, name varchar(10), a int, id int, unique index(id), primary key (a, b), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
notNull: true,
|
||||
},
|
||||
// Case-6: test primary key only contain foreign key columns, and enable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=1;",
|
||||
"create table t1 (id int, a int, b int, unique index(id), primary key (a, b));",
|
||||
"create table t2 (b int, a int, name varchar(10), id int, unique index(id), primary key (a, b), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
notNull: true,
|
||||
},
|
||||
// Case-7: test primary key contain foreign key columns and other column, and disable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=0;",
|
||||
"create table t1 (id int, a int, b int, unique index(id), primary key (a, b, id));",
|
||||
"create table t2 (b int, a int, id int, name varchar(10), unique index(id), primary key (a, b, id), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
notNull: true,
|
||||
},
|
||||
// Case-8: test primary key contain foreign key columns and other column, and enable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=1;",
|
||||
"create table t1 (id int, a int, b int, unique index(id), primary key (a, b, id));",
|
||||
"create table t2 (name varchar(10), b int, a int, id int, unique index(id), primary key (a, b, id), foreign key fk(a, b) references t1(a, b));",
|
||||
},
|
||||
notNull: true,
|
||||
},
|
||||
}
|
||||
|
||||
func TestForeignKeyOnInsertChildTable(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
tk.MustExec("create table t_data (id int, a int, b int)")
|
||||
tk.MustExec("insert into t_data (id, a, b) values (1, 1, 1), (2, 2, 2);")
|
||||
for _, ca := range foreignKeyTestCase1 {
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
for _, sql := range ca.prepareSQLs {
|
||||
tk.MustExec(sql)
|
||||
}
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 1, 1);")
|
||||
tk.MustExec("insert into t2 (id, a, b) values (1, 1, 1)")
|
||||
if !ca.notNull {
|
||||
tk.MustExec("insert into t2 (id, a, b) values (2, null, 1)")
|
||||
tk.MustExec("insert into t2 (id, a, b) values (3, 1, null)")
|
||||
tk.MustExec("insert into t2 (id, a, b) values (4, null, null)")
|
||||
}
|
||||
tk.MustGetDBError("insert into t2 (id, a, b) values (5, 1, 0);", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustGetDBError("insert into t2 (id, a, b) values (6, 0, 1);", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustGetDBError("insert into t2 (id, a, b) values (7, 2, 2);", plannercore.ErrNoReferencedRow2)
|
||||
// Test insert from select.
|
||||
tk.MustExec("delete from t2")
|
||||
tk.MustExec("insert into t2 (id, a, b) select id, a, b from t_data where t_data.id=1")
|
||||
tk.MustGetDBError("insert into t2 (id, a, b) select id, a, b from t_data where t_data.id=2", plannercore.ErrNoReferencedRow2)
|
||||
|
||||
// Test in txn
|
||||
tk.MustExec("delete from t2")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("delete from t1 where a=1")
|
||||
tk.MustGetDBError("insert into t2 (id, a, b) values (1, 1, 1)", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustExec("insert into t1 (id, a, b) values (2, 2, 2)")
|
||||
tk.MustExec("insert into t2 (id, a, b) values (2, 2, 2)")
|
||||
tk.MustExec("rollback")
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 1 1"))
|
||||
tk.MustQuery("select id, a, b from t2 order by id").Check(testkit.Rows())
|
||||
}
|
||||
|
||||
// Case-10: test primary key is handle and contain foreign key column, and foreign key column has default value.
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
tk.MustExec("set @@tidb_enable_clustered_index=0;")
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
tk.MustExec("create table t1 (id int,a int, primary key(id));")
|
||||
tk.MustExec("create table t2 (id int key,a int not null default 0, index (a), foreign key fk(a) references t1(id));")
|
||||
tk.MustExec("insert into t1 values (1, 1);")
|
||||
tk.MustExec("insert into t2 values (1, 1);")
|
||||
tk.MustGetDBError("insert into t2 (id) values (10);", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustGetDBError("insert into t2 values (3, 2);", plannercore.ErrNoReferencedRow2)
|
||||
|
||||
// Case-11: test primary key is handle and contain foreign key column, and foreign key column doesn't have default value.
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("create table t2 (id int key,a int, index (a), foreign key fk(a) references t1(id));")
|
||||
tk.MustExec("insert into t2 values (1, 1);")
|
||||
tk.MustExec("insert into t2 (id) values (10);")
|
||||
tk.MustGetDBError("insert into t2 values (3, 2);", plannercore.ErrNoReferencedRow2)
|
||||
}
|
||||
|
||||
func TestForeignKeyOnInsertDuplicateUpdateChildTable(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
for _, ca := range foreignKeyTestCase1 {
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
for _, sql := range ca.prepareSQLs {
|
||||
tk.MustExec(sql)
|
||||
}
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a')")
|
||||
|
||||
sqls := []string{
|
||||
"insert into t2 (id, a, b, name) values (1, 12, 22, 'b') on duplicate key update a = 100",
|
||||
"insert into t2 (id, a, b, name) values (1, 13, 23, 'c') on duplicate key update a = a+10",
|
||||
"insert into t2 (id, a, b, name) values (1, 14, 24, 'd') on duplicate key update a = a + 100",
|
||||
"insert into t2 (id, a, b, name) values (1, 14, 24, 'd') on duplicate key update a = 12, b = 23",
|
||||
}
|
||||
for _, sqlStr := range sqls {
|
||||
tk.MustGetDBError(sqlStr, plannercore.ErrNoReferencedRow2)
|
||||
}
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 14, 26, 'b') on duplicate key update a = 12, b = 22, name = 'x'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 12 22 x"))
|
||||
if !ca.notNull {
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 14, 26, 'b') on duplicate key update a = null, b = 22, name = 'y'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 <nil> 22 y"))
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 15, 26, 'b') on duplicate key update b = null, name = 'z'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 <nil> <nil> z"))
|
||||
}
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 15, 26, 'b') on duplicate key update a=13,b=23, name = 'c'")
|
||||
tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("1 13 23 c"))
|
||||
|
||||
// Test In txn.
|
||||
tk.MustExec("delete from t2")
|
||||
tk.MustExec("delete from t1")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (2, 11, 21, 'a')")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (2, 14, 26, 'b') on duplicate key update a = 12, b = 22, name = 'x'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("2 12 22 x"))
|
||||
tk.MustExec("rollback")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("2 11 21 a"))
|
||||
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("delete from t1 where id=3")
|
||||
tk.MustGetDBError("insert into t2 (id, a, b, name) values (2, 13, 23, 'y') on duplicate key update a = 13, b = 23, name = 'y'", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (2, 14, 24, 'z') on duplicate key update a = 14, b = 24, name = 'z'")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (2, 15, 25, 'o') on duplicate key update a = 15, b = 25, name = 'o'")
|
||||
tk.MustExec("delete from t1 where id=1")
|
||||
tk.MustGetDBError("insert into t2 (id, a, b, name) values (2, 11, 21, 'y') on duplicate key update a = 11, b = 21, name = 'p'", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustExec("commit")
|
||||
tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("2 15 25 o"))
|
||||
}
|
||||
|
||||
// Case-9: test primary key is handle and contain foreign key column.
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
tk.MustExec("set @@tidb_enable_clustered_index=0;")
|
||||
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
|
||||
tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')")
|
||||
|
||||
tk.MustExec("insert into t2 (id, a) values (11, 1) on duplicate key update a = 2, name = 'b'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 2 21 b"))
|
||||
tk.MustExec("insert into t2 (id, a, b) values (11, 2, 22) on duplicate key update a = 3, name = 'c'")
|
||||
tk.MustExec("insert into t2 (id, a, name) values (11, 3, 'b') on duplicate key update b = b+10, name = 'd'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 3 31 d"))
|
||||
tk.MustExec("insert into t2 (id, a, name) values (11, 3, 'b') on duplicate key update id = 1, name = 'f'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 31 f"))
|
||||
tk.MustGetDBError("insert into t2 (id, a, name) values (1, 3, 'b') on duplicate key update a = 10", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 31 f"))
|
||||
|
||||
// Test In txn.
|
||||
tk.MustExec("delete from t2")
|
||||
tk.MustExec("delete from t1")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 1, 21, 'a')")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t2 (id, a) values (11, 1) on duplicate key update a = 2, name = 'b'")
|
||||
tk.MustExec("rollback")
|
||||
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("delete from t1 where id=2")
|
||||
tk.MustGetDBError("insert into t2 (id, a) values (1, 1) on duplicate key update a = 2, name = 'b'", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustExec("insert into t2 (id, a) values (1, 1) on duplicate key update a = 3, name = 'c'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 21 c"))
|
||||
tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)")
|
||||
tk.MustExec("insert into t2 (id, a) values (3, 3) on duplicate key update a = 5, name = 'd'")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 5 21 d"))
|
||||
tk.MustExec("delete from t1 where id=1")
|
||||
tk.MustGetDBError("insert into t2 (id, a) values (1, 5) on duplicate key update a = 1, name = 'e'", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustExec("commit")
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 5 21 d"))
|
||||
}
|
||||
|
||||
func TestForeignKeyCheckAndLock(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
tk2 := testkit.NewTestKit(t, store)
|
||||
tk2.MustExec("set @@foreign_key_checks=1")
|
||||
tk2.MustExec("use test")
|
||||
|
||||
cases := []struct {
|
||||
prepareSQLs []string
|
||||
}{
|
||||
// Case-1: test unique index only contain foreign key columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int, name varchar(10), unique index (id))",
|
||||
"create table t2 (a int, name varchar(10), unique index (a), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-2: test unique index contain foreign key columns and other columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int, name varchar(10), unique index (id, name))",
|
||||
"create table t2 (name varchar(10), a int, unique index (a, name), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-3: test non-unique index only contain foreign key columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int, name varchar(10), index (id))",
|
||||
"create table t2 (a int, name varchar(10), index (a), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-4: test non-unique index contain foreign key columns and other columns.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"create table t1 (id int, name varchar(10), index (id, name))",
|
||||
"create table t2 (name varchar(10), a int, index (a, name), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-5: test primary key only contain foreign key columns, and disable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=0;",
|
||||
"create table t1 (id int, name varchar(10), primary key (id))",
|
||||
"create table t2 (a int, name varchar(10), primary key (a), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-6: test primary key only contain foreign key columns, and enable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=1;",
|
||||
"create table t1 (id int, name varchar(10), primary key (id))",
|
||||
"create table t2 (a int, name varchar(10), primary key (a), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
//Case-7: test primary key contain foreign key columns and other column, and disable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=0;",
|
||||
"create table t1 (id int, name varchar(10), primary key (id, name))",
|
||||
"create table t2 (a int, name varchar(10), primary key (a , name), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
// Case-8: test primary key contain foreign key columns and other column, and enable tidb_enable_clustered_index.
|
||||
{
|
||||
prepareSQLs: []string{
|
||||
"set @@tidb_enable_clustered_index=1;",
|
||||
"create table t1 (id int, name varchar(10), primary key (id, name))",
|
||||
"create table t2 (a int, name varchar(10), primary key (a , name), foreign key fk(a) references t1(id))",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, ca := range cases {
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
for _, sql := range ca.prepareSQLs {
|
||||
tk.MustExec(sql)
|
||||
}
|
||||
// Test in optimistic txn
|
||||
tk.MustExec("insert into t1 (id, name) values (1, 'a');")
|
||||
// Test insert child table
|
||||
tk.MustExec("begin optimistic")
|
||||
tk.MustExec("insert into t2 (a, name) values (1, 'a');")
|
||||
tk2.MustExec("delete from t1 where id = 1")
|
||||
err := tk.ExecToErr("commit")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "Write conflict")
|
||||
}
|
||||
}
|
||||
|
||||
func TestForeignKeyOnInsertIgnore(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
tk.MustExec("CREATE TABLE t1 (i INT PRIMARY KEY);")
|
||||
tk.MustExec("CREATE TABLE t2 (i INT, FOREIGN KEY (i) REFERENCES t1 (i));")
|
||||
tk.MustExec("INSERT INTO t1 VALUES (1),(3);")
|
||||
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1),(2),(3),(4);")
|
||||
warning := "Warning 1452 Cannot add or update a child row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk_1` FOREIGN KEY (`i`) REFERENCES `t1` (`i`))"
|
||||
tk.MustQuery("show warnings;").Check(testkit.Rows(warning, warning))
|
||||
tk.MustQuery("select * from t2").Check(testkit.Rows("1", "3"))
|
||||
}
|
||||
|
||||
func TestForeignKeyOnInsertOnDuplicateParentTableCheck(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
for _, ca := range foreignKeyTestCase1 {
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
for _, sql := range ca.prepareSQLs {
|
||||
tk.MustExec(sql)
|
||||
}
|
||||
if !ca.notNull {
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24), (5, 15, null), (6, null, 26), (7, null, null);")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a'), (5, 15, null, 'e'), (6, null, 26, 'f'), (7, null, null, 'g');")
|
||||
|
||||
tk.MustExec("insert into t1 (id, a) values (2, 12) on duplicate key update a=a+100, b=b+200")
|
||||
tk.MustExec("insert into t1 (id, a) values (3, 13), (2, 12) on duplicate key update a=a+1000, b=b+2000")
|
||||
tk.MustExec("insert into t1 (id) values (5), (6), (7) on duplicate key update a=a+10000, b=b+20000")
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 <nil>", "6 <nil> 20026", "7 <nil> <nil>"))
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 <nil> e", "6 <nil> 26 f", "7 <nil> <nil> g"))
|
||||
|
||||
tk.MustGetDBError("insert into t1 (id, a) values (1, 11) on duplicate key update a=a+10, b=b+20", plannercore.ErrRowIsReferenced2)
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 <nil>", "6 <nil> 20026", "7 <nil> <nil>"))
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 <nil> e", "6 <nil> 26 f", "7 <nil> <nil> g"))
|
||||
} else {
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a');")
|
||||
|
||||
tk.MustExec("insert into t1 (id, a, b) values (2, 12, 22) on duplicate key update a=a+100, b=b+200")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (3, 13, 23), (2, 12, 22) on duplicate key update a=a+1000, b=b+2000")
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24"))
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a"))
|
||||
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21) on duplicate key update id=11")
|
||||
tk.MustGetDBError("insert into t1 (id, a, b) values (1, 11, 21) on duplicate key update a=a+10, b=b+20", plannercore.ErrRowIsReferenced2)
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("2 1112 2222", "3 1013 2023", "4 14 24", "11 11 21"))
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a"))
|
||||
}
|
||||
}
|
||||
|
||||
// Case-9: test primary key is handle and contain foreign key column.
|
||||
tk.MustExec("drop table if exists t2;")
|
||||
tk.MustExec("drop table if exists t1;")
|
||||
tk.MustExec("set @@tidb_enable_clustered_index=0;")
|
||||
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
|
||||
tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
|
||||
tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')")
|
||||
|
||||
tk.MustExec("insert into t1 (id, a, b) values (2, 0, 0), (3, 0, 0) on duplicate key update id=id+100")
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "4 14 24", "102 12 22", "103 13 23"))
|
||||
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 0, 0) on duplicate key update a=a+100")
|
||||
tk.MustGetDBError("insert into t1 (id, a, b) values (1, 0, 0) on duplicate key update id=100+id", plannercore.ErrRowIsReferenced2)
|
||||
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 111 21", "4 14 24", "102 12 22", "103 13 23"))
|
||||
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 1 21 a"))
|
||||
}
|
||||
|
||||
func TestForeignKey(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
|
||||
// Test table has more than 1 foreign keys.
|
||||
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
|
||||
tk.MustExec("create table t2 (id int, a int, b int, primary key (id));")
|
||||
tk.MustExec("create table t3 (b int, a int, id int, primary key (a), foreign key (a) references t1(id), foreign key (b) references t2(id));")
|
||||
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 111), (2, 22, 222);")
|
||||
tk.MustExec("insert into t2 (id, a, b) values (2, 22, 222);")
|
||||
tk.MustGetDBError("insert into t3 (id, a, b) values (1, 1, 1)", plannercore.ErrNoReferencedRow2)
|
||||
tk.MustGetDBError("insert into t3 (id, a, b) values (2, 3, 2)", plannercore.ErrNoReferencedRow2)
|
||||
}
|
||||
|
||||
func TestForeignKeyConcurrentInsertChildTable(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("create table t1 (id int, a int, primary key (id));")
|
||||
tk.MustExec("create table t2 (id int, a int, index(a), foreign key fk(a) references t1(id));")
|
||||
tk.MustExec("insert into t1 (id, a) values (1, 11),(2, 12), (3, 13), (4, 14)")
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
|
||||
tk.MustExec("set @@foreign_key_checks=1")
|
||||
tk.MustExec("use test")
|
||||
for cnt := 0; cnt < 20; cnt++ {
|
||||
id := cnt%4 + 1
|
||||
sql := fmt.Sprintf("insert into t2 (id, a) values (%v, %v)", cnt, id)
|
||||
tk.MustExec(sql)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@ -420,7 +420,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
|
||||
}
|
||||
|
||||
newData := e.row4Update[:len(oldRow)]
|
||||
_, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker)
|
||||
_, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -448,3 +448,8 @@ func (e *InsertExec) setMessage() {
|
||||
stmtCtx.SetMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
// GetFKChecks implements WithForeignKeyTrigger interface.
|
||||
func (e *InsertExec) GetFKChecks() []*FKCheckExec {
|
||||
return e.fkChecks
|
||||
}
|
||||
|
||||
@ -97,6 +97,8 @@ type InsertValues struct {
|
||||
// We use mutex to protect routine from using invalid txn.
|
||||
isLoadData bool
|
||||
txnInUse sync.Mutex
|
||||
// fkChecks contains the foreign key checkers.
|
||||
fkChecks []*FKCheckExec
|
||||
}
|
||||
|
||||
type defaultVal struct {
|
||||
@ -1126,6 +1128,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
|
||||
defer snapshot.SetOption(kv.CollectRuntimeStats, nil)
|
||||
}
|
||||
}
|
||||
sc := e.ctx.GetSessionVars().StmtCtx
|
||||
for _, fkc := range e.fkChecks {
|
||||
err = fkc.checkRows(ctx, sc, txn, toBeCheckedRows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
prefetchStart := time.Now()
|
||||
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
|
||||
// Temporary table need not to do prefetch because its all data are stored in the memory.
|
||||
@ -1265,6 +1274,14 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
|
||||
if e.lastInsertID != 0 {
|
||||
vars.SetLastInsertID(e.lastInsertID)
|
||||
}
|
||||
if !vars.StmtCtx.BatchCheck {
|
||||
for _, fkc := range e.fkChecks {
|
||||
err = fkc.insertRowNeedToCheck(vars.StmtCtx, row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -191,7 +191,7 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n
|
||||
flags := bAssignFlag[content.Start:content.End]
|
||||
|
||||
// Update row
|
||||
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker)
|
||||
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, nil)
|
||||
if err1 == nil {
|
||||
_, exist := e.updatedRowKeys[content.Start].Get(handle)
|
||||
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
|
||||
|
||||
@ -51,7 +51,7 @@ var (
|
||||
// 1. changed (bool) : does the update really change the row values. e.g. update set i = 1 where i = 1;
|
||||
// 2. err (error) : error in the update.
|
||||
func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table,
|
||||
onDup bool, memTracker *memory.Tracker) (bool, error) {
|
||||
onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec) (bool, error) {
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context()))
|
||||
defer span1.Finish()
|
||||
@ -214,6 +214,12 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
for _, fkt := range fkChecks {
|
||||
err := fkt.updateRowNeedToCheck(sc, oldData, newData)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
if onDup {
|
||||
sc.AddAffectedRows(2)
|
||||
} else {
|
||||
|
||||
@ -91,6 +91,8 @@ const (
|
||||
RequestSourceType
|
||||
// ReplicaReadAdjuster set the adjust function of cop requsts.
|
||||
ReplicaReadAdjuster
|
||||
// ScanBatchSize set the iter scan batch size.
|
||||
ScanBatchSize
|
||||
)
|
||||
|
||||
// ReplicaReadType is the type of replica to read data from
|
||||
|
||||
@ -1566,7 +1566,7 @@ func (fk *FKInfo) String(db, tb string) string {
|
||||
buf.WriteString(fk.Name.O + "` FOREIGN KEY (")
|
||||
for i, col := range fk.Cols {
|
||||
if i > 0 {
|
||||
buf.WriteByte(byte(','))
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("`" + col.O + "`")
|
||||
}
|
||||
@ -1579,7 +1579,7 @@ func (fk *FKInfo) String(db, tb string) string {
|
||||
buf.WriteString("` (")
|
||||
for i, col := range fk.RefCols {
|
||||
if i > 0 {
|
||||
buf.WriteByte(byte(','))
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("`" + col.O + "`")
|
||||
}
|
||||
|
||||
@ -323,6 +323,8 @@ type Insert struct {
|
||||
AllAssignmentsAreConstant bool
|
||||
|
||||
RowLen int
|
||||
|
||||
FKChecks []*FKCheck
|
||||
}
|
||||
|
||||
// Update represents Update plan.
|
||||
|
||||
@ -111,8 +111,10 @@ var (
|
||||
ErrKeyPart0 = dbterror.ClassOptimizer.NewStd(mysql.ErrKeyPart0)
|
||||
ErrGettingNoopVariable = dbterror.ClassOptimizer.NewStd(mysql.ErrGettingNoopVariable)
|
||||
|
||||
ErrPrepareMulti = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareMulti)
|
||||
ErrUnsupportedPs = dbterror.ClassExecutor.NewStd(mysql.ErrUnsupportedPs)
|
||||
ErrPsManyParam = dbterror.ClassExecutor.NewStd(mysql.ErrPsManyParam)
|
||||
ErrPrepareDDL = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareDDL)
|
||||
ErrPrepareMulti = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareMulti)
|
||||
ErrUnsupportedPs = dbterror.ClassExecutor.NewStd(mysql.ErrUnsupportedPs)
|
||||
ErrPsManyParam = dbterror.ClassExecutor.NewStd(mysql.ErrPsManyParam)
|
||||
ErrPrepareDDL = dbterror.ClassExecutor.NewStd(mysql.ErrPrepareDDL)
|
||||
ErrRowIsReferenced2 = dbterror.ClassOptimizer.NewStd(mysql.ErrRowIsReferenced2)
|
||||
ErrNoReferencedRow2 = dbterror.ClassOptimizer.NewStd(mysql.ErrNoReferencedRow2)
|
||||
)
|
||||
|
||||
176
planner/core/foreign_key.go
Normal file
176
planner/core/foreign_key.go
Normal file
@ -0,0 +1,176 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/table"
|
||||
)
|
||||
|
||||
// FKCheck indicates the foreign key constraint checker.
|
||||
type FKCheck struct {
|
||||
FK *model.FKInfo
|
||||
ReferredFK *model.ReferredFKInfo
|
||||
Tbl table.Table
|
||||
Idx table.Index
|
||||
Cols []model.CIStr
|
||||
|
||||
IdxIsPrimaryKey bool
|
||||
IdxIsExclusive bool
|
||||
|
||||
CheckExist bool
|
||||
FailedErr error
|
||||
}
|
||||
|
||||
func (p *Insert) buildOnInsertFKChecks(ctx sessionctx.Context, is infoschema.InfoSchema, dbName string) ([]*FKCheck, error) {
|
||||
if !ctx.GetSessionVars().ForeignKeyChecks {
|
||||
return nil, nil
|
||||
}
|
||||
tblInfo := p.Table.Meta()
|
||||
fkChecks := make([]*FKCheck, 0, len(tblInfo.ForeignKeys))
|
||||
updateCols := p.buildOnDuplicateUpdateColumns()
|
||||
if len(updateCols) > 0 {
|
||||
referredFKChecks, err := buildOnUpdateReferredFKChecks(is, dbName, tblInfo, updateCols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(referredFKChecks) > 0 {
|
||||
fkChecks = append(fkChecks, referredFKChecks...)
|
||||
}
|
||||
}
|
||||
for _, fk := range tblInfo.ForeignKeys {
|
||||
if fk.Version < 1 {
|
||||
continue
|
||||
}
|
||||
failedErr := ErrNoReferencedRow2.FastGenByArgs(fk.String(dbName, tblInfo.Name.L))
|
||||
fkCheck, err := buildFKCheckOnModifyChildTable(is, fk, failedErr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fkCheck != nil {
|
||||
fkChecks = append(fkChecks, fkCheck)
|
||||
}
|
||||
}
|
||||
return fkChecks, nil
|
||||
}
|
||||
|
||||
func (p *Insert) buildOnDuplicateUpdateColumns() map[string]struct{} {
|
||||
m := make(map[string]struct{})
|
||||
for _, assign := range p.OnDuplicate {
|
||||
m[assign.ColName.L] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func buildOnUpdateReferredFKChecks(is infoschema.InfoSchema, dbName string, tblInfo *model.TableInfo, updateCols map[string]struct{}) ([]*FKCheck, error) {
|
||||
referredFKs := is.GetTableReferredForeignKeys(dbName, tblInfo.Name.L)
|
||||
fkChecks := make([]*FKCheck, 0, len(referredFKs))
|
||||
for _, referredFK := range referredFKs {
|
||||
if !isMapContainAnyCols(updateCols, referredFK.Cols...) {
|
||||
continue
|
||||
}
|
||||
fkCheck, err := buildFKCheckOnModifyReferTable(is, referredFK)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fkCheck != nil {
|
||||
fkChecks = append(fkChecks, fkCheck)
|
||||
}
|
||||
}
|
||||
return fkChecks, nil
|
||||
}
|
||||
|
||||
func isMapContainAnyCols(colsMap map[string]struct{}, cols ...model.CIStr) bool {
|
||||
for _, col := range cols {
|
||||
_, exist := colsMap[col.L]
|
||||
if exist {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func buildFKCheckOnModifyChildTable(is infoschema.InfoSchema, fk *model.FKInfo, failedErr error) (*FKCheck, error) {
|
||||
referTable, err := is.TableByName(fk.RefSchema, fk.RefTable)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
fkCheck, err := buildFKCheck(referTable, fk.RefCols, failedErr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fkCheck.CheckExist = true
|
||||
fkCheck.FK = fk
|
||||
return fkCheck, nil
|
||||
}
|
||||
|
||||
func buildFKCheckOnModifyReferTable(is infoschema.InfoSchema, referredFK *model.ReferredFKInfo) (*FKCheck, error) {
|
||||
childTable, err := is.TableByName(referredFK.ChildSchema, referredFK.ChildTable)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
fk := model.FindFKInfoByName(childTable.Meta().ForeignKeys, referredFK.ChildFKName.L)
|
||||
if fk == nil || fk.Version < 1 {
|
||||
return nil, nil
|
||||
}
|
||||
failedErr := ErrRowIsReferenced2.GenWithStackByArgs(fk.String(referredFK.ChildSchema.L, referredFK.ChildTable.L))
|
||||
fkCheck, err := buildFKCheck(childTable, fk.Cols, failedErr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fkCheck.CheckExist = false
|
||||
fkCheck.ReferredFK = referredFK
|
||||
return fkCheck, nil
|
||||
}
|
||||
|
||||
func buildFKCheck(tbl table.Table, cols []model.CIStr, failedErr error) (*FKCheck, error) {
|
||||
tblInfo := tbl.Meta()
|
||||
if tblInfo.PKIsHandle && len(cols) == 1 {
|
||||
refColInfo := model.FindColumnInfo(tblInfo.Columns, cols[0].L)
|
||||
if refColInfo != nil && mysql.HasPriKeyFlag(refColInfo.GetFlag()) {
|
||||
return &FKCheck{
|
||||
Tbl: tbl,
|
||||
IdxIsPrimaryKey: true,
|
||||
IdxIsExclusive: true,
|
||||
FailedErr: failedErr,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
referTbIdxInfo := model.FindIndexByColumns(tblInfo, cols...)
|
||||
if referTbIdxInfo == nil {
|
||||
return nil, failedErr
|
||||
}
|
||||
var tblIdx table.Index
|
||||
for _, idx := range tbl.Indices() {
|
||||
if idx.Meta().ID == referTbIdxInfo.ID {
|
||||
tblIdx = idx
|
||||
}
|
||||
}
|
||||
if tblIdx == nil {
|
||||
return nil, failedErr
|
||||
}
|
||||
|
||||
return &FKCheck{
|
||||
Tbl: tbl,
|
||||
Idx: tblIdx,
|
||||
IdxIsExclusive: len(cols) == len(referTbIdxInfo.Columns),
|
||||
IdxIsPrimaryKey: referTbIdxInfo.Primary && tblInfo.IsCommonHandle,
|
||||
FailedErr: failedErr,
|
||||
}, nil
|
||||
}
|
||||
@ -3590,6 +3590,10 @@ func (b *PlanBuilder) buildInsert(ctx context.Context, insert *ast.InsertStmt) (
|
||||
}
|
||||
|
||||
err = insertPlan.ResolveIndices()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertPlan.FKChecks, err = insertPlan.buildOnInsertFKChecks(b.ctx, b.is, tn.DBInfo.Name.L)
|
||||
return insertPlan, err
|
||||
}
|
||||
|
||||
|
||||
@ -130,6 +130,11 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
|
||||
s.KVSnapshot.SetRequestSourceType(val.(string))
|
||||
case kv.ReplicaReadAdjuster:
|
||||
s.KVSnapshot.SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
|
||||
case kv.ScanBatchSize:
|
||||
size := val.(int)
|
||||
if size > 0 {
|
||||
s.KVSnapshot.SetScanBatchSize(size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user