Files
tidb/executor/update.go

194 lines
4.9 KiB
Go

// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)
// UpdateExec represents a new update executor.
type UpdateExec struct {
baseExecutor
SelectExec Executor
OrderedList []*expression.Assignment
// updatedRowKeys is a map for unique (Table, handle) pair.
updatedRowKeys map[int64]map[int64]struct{}
tblID2table map[int64]table.Table
rows []types.DatumRow // The rows fetched from TableExec.
newRowsData []types.DatumRow // The new values to be set.
fetched bool
cursor int
}
func (e *UpdateExec) exec(schema *expression.Schema) (types.DatumRow, error) {
assignFlag, err := e.getUpdateColumns(schema.Len())
if err != nil {
return nil, errors.Trace(err)
}
if e.cursor >= len(e.rows) {
return nil, nil
}
if e.updatedRowKeys == nil {
e.updatedRowKeys = make(map[int64]map[int64]struct{})
}
row := e.rows[e.cursor]
newData := e.newRowsData[e.cursor]
for id, cols := range schema.TblID2Handle {
tbl := e.tblID2table[id]
if e.updatedRowKeys[id] == nil {
e.updatedRowKeys[id] = make(map[int64]struct{})
}
for _, col := range cols {
offset := getTableOffset(schema, col)
end := offset + len(tbl.WritableCols())
handle := row[col.Index].GetInt64()
oldData := row[offset:end]
newTableData := newData[offset:end]
flags := assignFlag[offset:end]
_, ok := e.updatedRowKeys[id][handle]
if ok {
// Each matched row is updated once, even if it matches the conditions multiple times.
continue
}
// Update row
changed, _, _, _, err1 := updateRecord(e.ctx, handle, oldData, newTableData, flags, tbl, false)
if err1 == nil {
if changed {
e.updatedRowKeys[id][handle] = struct{}{}
}
continue
}
sc := e.ctx.GetSessionVars().StmtCtx
if kv.ErrKeyExists.Equal(err1) && sc.DupKeyAsWarning {
sc.AppendWarning(err1)
continue
}
return nil, errors.Trace(err1)
}
}
e.cursor++
return types.DatumRow{}, nil
}
// Next implements the Executor Next interface.
func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if !e.fetched {
err := e.fetchChunkRows(ctx)
if err != nil {
return errors.Trace(err)
}
e.fetched = true
for {
row, err := e.exec(e.children[0].Schema())
if err != nil {
return errors.Trace(err)
}
// once "row == nil" there is no more data waiting to be updated,
// the execution of UpdateExec is finished.
if row == nil {
break
}
}
}
return nil
}
func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
fields := e.children[0].retTypes()
globalRowIdx := 0
for {
chk := e.children[0].newChunk()
err := e.children[0].Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
newRow, err1 := e.composeNewRow(globalRowIdx, datumRow)
if err1 != nil {
return errors.Trace(err1)
}
e.rows = append(e.rows, datumRow)
e.newRowsData = append(e.newRowsData, newRow)
globalRowIdx++
}
}
return nil
}
func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error {
if err == nil {
return nil
}
if types.ErrDataTooLong.Equal(err) {
return resetErrDataTooLong(colName.O, rowIdx+1, err)
}
return errors.Trace(err)
}
func (e *UpdateExec) composeNewRow(rowIdx int, oldRow types.DatumRow) (types.DatumRow, error) {
newRowData := oldRow.Copy()
for _, assign := range e.OrderedList {
val, err := assign.Expr.Eval(newRowData)
if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {
return nil, errors.Trace(err1)
}
newRowData[assign.Col.Index] = val
}
return newRowData, nil
}
// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
return e.SelectExec.Close()
}
// Open implements the Executor Open interface.
func (e *UpdateExec) Open(ctx context.Context) error {
return e.SelectExec.Open(ctx)
}
func (e *UpdateExec) getUpdateColumns(schemaLen int) ([]bool, error) {
assignFlag := make([]bool, schemaLen)
for _, v := range e.OrderedList {
idx := v.Col.Index
assignFlag[idx] = true
}
return assignFlag, nil
}