361 lines
11 KiB
Go
361 lines
11 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// TODO combine with the pkg/kv package outside.
|
|
|
|
package kv
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/metric"
|
|
"github.com/pingcap/tidb/pkg/lightning/verification"
|
|
"github.com/pingcap/tidb/pkg/meta/autoid"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql" //nolint: goimports
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
)
|
|
|
|
type tableKVEncoder struct {
|
|
*BaseKVEncoder
|
|
metrics *metric.Metrics
|
|
}
|
|
|
|
// GetSession4test is only used for test.
|
|
func GetSession4test(encoder encode.Encoder) *Session {
|
|
return encoder.(*tableKVEncoder).SessionCtx
|
|
}
|
|
|
|
// NewTableKVEncoder creates a new tableKVEncoder.
|
|
func NewTableKVEncoder(
|
|
config *encode.EncodingConfig,
|
|
metrics *metric.Metrics,
|
|
) (encode.Encoder, error) {
|
|
if metrics != nil {
|
|
metrics.KvEncoderCounter.WithLabelValues("open").Inc()
|
|
}
|
|
baseKVEncoder, err := NewBaseKVEncoder(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &tableKVEncoder{
|
|
BaseKVEncoder: baseKVEncoder,
|
|
metrics: metrics,
|
|
}, nil
|
|
}
|
|
|
|
// CollectGeneratedColumns collects all expressions required to evaluate the
|
|
// results of all generated columns. The returning slice is in evaluation order.
|
|
func CollectGeneratedColumns(se *Session, meta *model.TableInfo, cols []*table.Column) ([]GeneratedCol, error) {
|
|
hasGenCol := false
|
|
for _, col := range cols {
|
|
if col.GeneratedExpr != nil {
|
|
hasGenCol = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasGenCol {
|
|
return nil, nil
|
|
}
|
|
|
|
// not using TableInfo2SchemaAndNames to avoid parsing all virtual generated columns again.
|
|
exprColumns := make([]*expression.Column, 0, len(cols))
|
|
names := make(types.NameSlice, 0, len(cols))
|
|
for i, col := range cols {
|
|
names = append(names, &types.FieldName{
|
|
OrigTblName: meta.Name,
|
|
OrigColName: col.Name,
|
|
TblName: meta.Name,
|
|
ColName: col.Name,
|
|
})
|
|
exprColumns = append(exprColumns, &expression.Column{
|
|
RetType: col.FieldType.Clone(),
|
|
ID: col.ID,
|
|
UniqueID: int64(i),
|
|
Index: col.Offset,
|
|
OrigName: names[i].String(),
|
|
IsHidden: col.Hidden,
|
|
})
|
|
}
|
|
schema := expression.NewSchema(exprColumns...)
|
|
|
|
// as long as we have a stored generated column, all columns it referred to must be evaluated as well.
|
|
// for simplicity we just evaluate all generated columns (virtual or not) before the last stored one.
|
|
var genCols []GeneratedCol
|
|
for i, col := range cols {
|
|
if col.GeneratedExpr != nil {
|
|
expr, err := expression.BuildSimpleExpr(
|
|
se.GetExprCtx(),
|
|
col.GeneratedExpr.Internal(),
|
|
expression.WithInputSchemaAndNames(schema, names, meta),
|
|
expression.WithAllowCastArray(true),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
genCols = append(genCols, GeneratedCol{
|
|
Index: i,
|
|
Expr: expr,
|
|
})
|
|
}
|
|
}
|
|
|
|
// order the result by column offset so they match the evaluation order.
|
|
slices.SortFunc(genCols, func(i, j GeneratedCol) int {
|
|
return cmp.Compare(cols[i.Index].Offset, cols[j.Index].Offset)
|
|
})
|
|
return genCols, nil
|
|
}
|
|
|
|
// Close implements the Encoder interface.
|
|
func (kvcodec *tableKVEncoder) Close() {
|
|
kvcodec.SessionCtx.Close()
|
|
if kvcodec.metrics != nil {
|
|
kvcodec.metrics.KvEncoderCounter.WithLabelValues("close").Inc()
|
|
}
|
|
}
|
|
|
|
// Pairs implements the Encoder interface.
|
|
type Pairs struct {
|
|
Pairs []common.KvPair
|
|
BytesBuf *BytesBuf
|
|
MemBuf *MemBuf
|
|
}
|
|
|
|
// GroupedPairs is a map from index ID to KvPairs.
|
|
type GroupedPairs map[int64][]common.KvPair
|
|
|
|
// SplitIntoChunks implements the encode.Rows interface. It just satisfies the
|
|
// type system and should never be called.
|
|
func (GroupedPairs) SplitIntoChunks(int) []encode.Rows {
|
|
panic("not implemented")
|
|
}
|
|
|
|
// Clear implements the encode.Rows interface. It just satisfies the type system
|
|
// and should never be called.
|
|
func (GroupedPairs) Clear() encode.Rows {
|
|
panic("not implemented")
|
|
}
|
|
|
|
// MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is
|
|
// mainly used for testing only. The resulting Rows instance should only be used
|
|
// for the importer backend.
|
|
func MakeRowsFromKvPairs(pairs []common.KvPair) encode.Rows {
|
|
return &Pairs{Pairs: pairs}
|
|
}
|
|
|
|
// MakeRowFromKvPairs converts a KvPair slice into a Row instance. This is
|
|
// mainly used for testing only. The resulting Row instance should only be used
|
|
// for the importer backend.
|
|
func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row {
|
|
return &Pairs{Pairs: pairs}
|
|
}
|
|
|
|
// Rows2KvPairs converts a Rows instance constructed from MakeRowsFromKvPairs
|
|
// back into a slice of KvPair. This method panics if the Rows is not
|
|
// constructed in such way.
|
|
func Rows2KvPairs(rows encode.Rows) []common.KvPair {
|
|
switch v := rows.(type) {
|
|
case *Pairs:
|
|
return v.Pairs
|
|
case GroupedPairs:
|
|
cnt := 0
|
|
for _, pairs := range v {
|
|
cnt += len(pairs)
|
|
}
|
|
res := make([]common.KvPair, 0, cnt)
|
|
for _, pairs := range v {
|
|
res = append(res, pairs...)
|
|
}
|
|
return res
|
|
}
|
|
panic(fmt.Sprintf("unknown Rows type %T", rows))
|
|
}
|
|
|
|
// Row2KvPairs converts a Row instance constructed from MakeRowFromKvPairs
|
|
// back into a slice of KvPair. This method panics if the Row is not
|
|
// constructed in such way.
|
|
func Row2KvPairs(row encode.Row) []common.KvPair {
|
|
return row.(*Pairs).Pairs
|
|
}
|
|
|
|
// ClearRow recycles the memory used by the row.
|
|
func ClearRow(row encode.Row) {
|
|
if pairs, ok := row.(*Pairs); ok {
|
|
pairs.Clear()
|
|
}
|
|
}
|
|
|
|
// Encode a row of data into KV pairs.
|
|
//
|
|
// See comments in `(*TableRestore).initializeColumns` for the meaning of the
|
|
// `columnPermutation` parameter.
|
|
func (kvcodec *tableKVEncoder) Encode(row []types.Datum,
|
|
rowID int64, columnPermutation []int, _ int64) (encode.Row, error) {
|
|
// we ignore warnings when encoding rows now, but warnings uses the same memory as parser, since the input
|
|
// row []types.Datum share the same underlying buf, and when doing CastValue, we're using hack.String/hack.Slice.
|
|
// when generating error such as mysql.ErrDataOutOfRange, the data will be part of the error, causing the buf
|
|
// unable to release. So we truncate the warnings here.
|
|
defer kvcodec.TruncateWarns()
|
|
var value types.Datum
|
|
var err error
|
|
|
|
record := kvcodec.GetOrCreateRecord()
|
|
for i, col := range kvcodec.Columns {
|
|
var theDatum *types.Datum
|
|
j := columnPermutation[i]
|
|
if j >= 0 && j < len(row) {
|
|
theDatum = &row[j]
|
|
}
|
|
// TODO: consider move the cast before calling ProcessColDatum. see https://github.com/pingcap/tidb/pull/60397/files#r2058032695
|
|
value, err = kvcodec.ProcessColDatum(col, rowID, theDatum, true)
|
|
if err != nil {
|
|
return nil, kvcodec.LogKVConvertFailed(row, j, col.ToInfo(), err)
|
|
}
|
|
|
|
record = append(record, value)
|
|
}
|
|
|
|
if common.TableHasAutoRowID(kvcodec.table.Meta()) {
|
|
rowValue := rowID
|
|
j := columnPermutation[len(kvcodec.Columns)]
|
|
if j >= 0 && j < len(row) {
|
|
value, err = table.CastColumnValue(kvcodec.SessionCtx.GetExprCtx(), row[j],
|
|
ExtraHandleColumnInfo, false, false)
|
|
rowValue = value.GetInt64()
|
|
} else {
|
|
rowID := kvcodec.AutoIDFn(rowID)
|
|
value, err = types.NewIntDatum(rowID), nil
|
|
}
|
|
if err != nil {
|
|
return nil, kvcodec.LogKVConvertFailed(row, j, ExtraHandleColumnInfo, err)
|
|
}
|
|
record = append(record, value)
|
|
alloc := kvcodec.TableAllocators().Get(autoid.RowIDAllocType)
|
|
if err := alloc.Rebase(context.Background(), rowValue, false); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
if len(kvcodec.GenCols) > 0 {
|
|
if errCol, err := kvcodec.EvalGeneratedColumns(record, kvcodec.Columns); err != nil {
|
|
return nil, kvcodec.LogEvalGenExprFailed(row, errCol, err)
|
|
}
|
|
}
|
|
|
|
return kvcodec.Record2KV(record, row, rowID)
|
|
}
|
|
|
|
// IsAutoIncCol return true if the column is auto increment column.
|
|
func IsAutoIncCol(colInfo *model.ColumnInfo) bool {
|
|
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
|
|
}
|
|
|
|
// GetEncoderIncrementalID return Auto increment id.
|
|
func GetEncoderIncrementalID(encoder encode.Encoder, id int64) int64 {
|
|
return encoder.(*tableKVEncoder).AutoIDFn(id)
|
|
}
|
|
|
|
// GetEncoderSe return session.
|
|
func GetEncoderSe(encoder encode.Encoder) *Session {
|
|
return encoder.(*tableKVEncoder).SessionCtx
|
|
}
|
|
|
|
// GetActualDatum export getActualDatum function.
|
|
func GetActualDatum(encoder encode.Encoder, col *table.Column, rowID int64,
|
|
inputDatum *types.Datum) (types.Datum, error) {
|
|
return encoder.(*tableKVEncoder).getActualDatum(col, rowID, inputDatum, true)
|
|
}
|
|
|
|
// GetAutoRecordID returns the record ID for an auto-increment field.
|
|
// get record value for auto-increment field
|
|
//
|
|
// See:
|
|
//
|
|
// https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852
|
|
func GetAutoRecordID(d types.Datum, target *types.FieldType) int64 {
|
|
switch target.GetType() {
|
|
case mysql.TypeFloat, mysql.TypeDouble:
|
|
return int64(math.Round(d.GetFloat64()))
|
|
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
|
|
return d.GetInt64()
|
|
default:
|
|
panic(fmt.Sprintf("unsupported auto-increment field type '%d'", target.GetType()))
|
|
}
|
|
}
|
|
|
|
// Size returns the total size of the key-value pairs.
|
|
func (kvs *Pairs) Size() uint64 {
|
|
size := uint64(0)
|
|
for _, kv := range kvs.Pairs {
|
|
size += uint64(len(kv.Key) + len(kv.Val))
|
|
}
|
|
return size
|
|
}
|
|
|
|
// ClassifyAndAppend separates the key-value pairs into data and index key-value pairs.
|
|
func (kvs *Pairs) ClassifyAndAppend(
|
|
data *encode.Rows,
|
|
dataChecksum *verification.KVChecksum,
|
|
indices *encode.Rows,
|
|
indexChecksum *verification.KVChecksum,
|
|
) {
|
|
dataKVs := (*data).(*Pairs)
|
|
indexKVs := (*indices).(*Pairs)
|
|
|
|
for _, kv := range kvs.Pairs {
|
|
if kv.Key[tablecodec.TableSplitKeyLen+1] == 'r' {
|
|
dataKVs.Pairs = append(dataKVs.Pairs, kv)
|
|
dataChecksum.UpdateOne(kv)
|
|
} else {
|
|
indexKVs.Pairs = append(indexKVs.Pairs, kv)
|
|
indexChecksum.UpdateOne(kv)
|
|
}
|
|
}
|
|
|
|
// the related buf is shared, so we only need to set it into one of the kvs so it can be released
|
|
if kvs.BytesBuf != nil {
|
|
dataKVs.BytesBuf = kvs.BytesBuf
|
|
dataKVs.MemBuf = kvs.MemBuf
|
|
kvs.BytesBuf = nil
|
|
kvs.MemBuf = nil
|
|
}
|
|
|
|
*data = dataKVs
|
|
*indices = indexKVs
|
|
}
|
|
|
|
// Clear clears the key-value pairs.
|
|
func (kvs *Pairs) Clear() encode.Rows {
|
|
if kvs.BytesBuf != nil {
|
|
kvs.MemBuf.Recycle(kvs.BytesBuf)
|
|
kvs.BytesBuf = nil
|
|
kvs.MemBuf = nil
|
|
}
|
|
kvs.Pairs = kvs.Pairs[:0]
|
|
return kvs
|
|
}
|