274 lines
8.9 KiB
Go
274 lines
8.9 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/pingcap/parser/mysql"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/kv"
|
|
"github.com/pingcap/tidb/table/tables"
|
|
"github.com/pingcap/tidb/tablecodec"
|
|
"github.com/pingcap/tidb/types"
|
|
"github.com/pingcap/tidb/util/chunk"
|
|
"github.com/pingcap/tidb/util/logutil"
|
|
"github.com/pingcap/tidb/util/stringutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// InsertExec represents an insert executor.
|
|
type InsertExec struct {
|
|
*InsertValues
|
|
OnDuplicate []*expression.Assignment
|
|
Priority mysql.PriorityEnum
|
|
}
|
|
|
|
func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
|
|
logutil.Eventf(ctx, "insert %d rows into table `%s`", len(rows), stringutil.MemoizeStr(func() string {
|
|
var tblName string
|
|
if meta := e.Table.Meta(); meta != nil {
|
|
tblName = meta.Name.L
|
|
}
|
|
return tblName
|
|
}))
|
|
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
|
|
sessVars := e.ctx.GetSessionVars()
|
|
defer sessVars.CleanBuffers()
|
|
ignoreErr := sessVars.StmtCtx.DupKeyAsWarning
|
|
|
|
if !sessVars.LightningMode {
|
|
txn, err := e.ctx.Txn(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap)
|
|
}
|
|
|
|
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(rows)))
|
|
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
|
|
// For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in
|
|
// the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs.
|
|
// However, if the `on duplicate update` is also specified, the duplicated row will be updated.
|
|
// Using BatchGet in insert ignore to mark rows as duplicated before we add records to the table.
|
|
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
|
|
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
|
|
if len(e.OnDuplicate) > 0 {
|
|
err := e.batchUpdateDupRows(ctx, rows)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else if ignoreErr {
|
|
err := e.batchCheckAndInsert(ctx, rows, e.addRecord)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
for _, row := range rows {
|
|
if _, err := e.addRecord(ctx, row); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
|
|
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
|
|
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Batch get the to-be-updated rows in storage.
|
|
err = e.initDupOldRowValue(ctx, e.ctx, e.Table, newRows)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i, r := range e.toBeCheckedRows {
|
|
if r.handleKey != nil {
|
|
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
|
|
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
for _, uk := range r.uniqueKeys {
|
|
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
|
|
handle, err := tables.DecodeHandle(val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newRows[i] = nil
|
|
break
|
|
}
|
|
}
|
|
// If row was checked with no duplicate keys,
|
|
// we should do insert the row,
|
|
// and key-values should be filled back to dupOldRowValues for the further row check,
|
|
// due to there may be duplicate keys inside the insert statement.
|
|
if newRows[i] != nil {
|
|
newHandle, err := e.addRecord(ctx, newRows[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.fillBackKeys(e.Table, r, newHandle)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Next implements the Executor Next interface.
|
|
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
|
|
req.Reset()
|
|
if len(e.children) > 0 && e.children[0] != nil {
|
|
return insertRowsFromSelect(ctx, e)
|
|
}
|
|
return insertRows(ctx, e)
|
|
}
|
|
|
|
// Close implements the Executor Close interface.
|
|
func (e *InsertExec) Close() error {
|
|
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
|
|
e.setMessage()
|
|
if e.SelectExec != nil {
|
|
return e.SelectExec.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Open implements the Executor Open interface.
|
|
func (e *InsertExec) Open(ctx context.Context) error {
|
|
if e.SelectExec != nil {
|
|
return e.SelectExec.Open(ctx)
|
|
}
|
|
e.initEvalBuffer()
|
|
return nil
|
|
}
|
|
|
|
// updateDupRow updates a duplicate row to a new row.
|
|
func (e *InsertExec) updateDupRow(ctx context.Context, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("InsertExec.updateDupRow", opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
|
|
oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
|
|
if err != nil {
|
|
logutil.BgLogger().Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
|
|
return err
|
|
}
|
|
// Do update row.
|
|
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(ctx, handle, oldRow, row.row, onDuplicate)
|
|
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
|
|
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.updateDupKeyValues(ctx, handle, newHandle, handleChanged, oldRow, updatedRow)
|
|
}
|
|
|
|
// doDupRowUpdate updates the duplicate row.
|
|
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
|
|
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
|
|
assignFlag := make([]bool, len(e.Table.WritableCols()))
|
|
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
|
|
e.ctx.GetSessionVars().CurrInsertValues = chunk.MutRowFromDatums(newRow).ToRow()
|
|
|
|
// NOTE: In order to execute the expression inside the column assignment,
|
|
// we have to put the value of "oldRow" before "newRow" in "row4Update" to
|
|
// be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
|
|
row4Update := make([]types.Datum, 0, len(oldRow)+len(newRow))
|
|
row4Update = append(row4Update, oldRow...)
|
|
row4Update = append(row4Update, newRow...)
|
|
|
|
// Update old row when the key is duplicated.
|
|
for _, col := range cols {
|
|
val, err1 := col.Expr.Eval(chunk.MutRowFromDatums(row4Update).ToRow())
|
|
if err1 != nil {
|
|
return nil, false, 0, err1
|
|
}
|
|
row4Update[col.Col.Index] = val
|
|
assignFlag[col.Col.Index] = true
|
|
}
|
|
|
|
newData := row4Update[:len(oldRow)]
|
|
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
|
|
if err != nil {
|
|
return nil, false, 0, err
|
|
}
|
|
return newData, handleChanged, newHandle, nil
|
|
}
|
|
|
|
// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
|
|
func (e *InsertExec) updateDupKeyValues(ctx context.Context, oldHandle int64, newHandle int64,
|
|
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
|
|
// There is only one row per update.
|
|
fillBackKeysInRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, [][]types.Datum{updatedRow})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Delete old keys and fill back new key-values of the updated row.
|
|
err = e.deleteDupKeys(ctx, e.ctx, e.Table, [][]types.Datum{oldRow})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if handleChanged {
|
|
delete(e.dupOldRowValues, string(e.Table.RecordKey(oldHandle)))
|
|
e.fillBackKeys(e.Table, fillBackKeysInRows[0], newHandle)
|
|
} else {
|
|
e.fillBackKeys(e.Table, fillBackKeysInRows[0], oldHandle)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setMessage sets info message(ERR_INSERT_INFO) generated by INSERT statement
|
|
func (e *InsertExec) setMessage() {
|
|
stmtCtx := e.ctx.GetSessionVars().StmtCtx
|
|
numRecords := stmtCtx.RecordRows()
|
|
if e.SelectExec != nil || numRecords > 1 {
|
|
numWarnings := stmtCtx.WarningCount()
|
|
var numDuplicates uint64
|
|
if stmtCtx.DupKeyAsWarning {
|
|
// if ignoreErr
|
|
numDuplicates = numRecords - stmtCtx.CopiedRows()
|
|
} else {
|
|
if e.ctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
|
|
numDuplicates = stmtCtx.TouchedRows()
|
|
} else {
|
|
numDuplicates = stmtCtx.UpdatedRows()
|
|
}
|
|
}
|
|
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrInsertInfo], numRecords, numDuplicates, numWarnings)
|
|
stmtCtx.SetMessage(msg)
|
|
}
|
|
}
|