Files
tidb/pkg/util/rowcodec/encoder.go
2024-11-07 13:12:30 +00:00

268 lines
7.8 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rowcodec
import (
"encoding/binary"
"hash/crc32"
"math"
"sort"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"go.uber.org/multierr"
)
// Encoder is used to encode a row.
type Encoder struct {
row
tempColIDs []int64
values []*types.Datum
// Enable indicates whether this encoder should be use.
Enable bool
}
// Encode encodes a row from a datums slice.
// `buf` is not truncated before encoding.
// This function may return both a valid encoded bytes and an error (actually `"pingcap/errors".ErrorGroup`). If the caller
// expects to handle these errors according to `SQL_MODE` or other configuration, please refer to `pkg/errctx`.
// the caller needs to ensure the key is not nil if checksum is required.
func (encoder *Encoder) Encode(loc *time.Location, colIDs []int64, values []types.Datum, checksum Checksum, buf []byte) ([]byte, error) {
encoder.reset()
encoder.appendColVals(colIDs, values)
numCols, notNullIdx := encoder.reformatCols()
err := encoder.encodeRowCols(loc, numCols, notNullIdx)
if err != nil {
return nil, err
}
if checksum == nil {
checksum = NoChecksum{}
}
return checksum.encode(encoder, buf)
}
func (encoder *Encoder) reset() {
encoder.flags = 0
encoder.numNotNullCols = 0
encoder.numNullCols = 0
encoder.data = encoder.data[:0]
encoder.tempColIDs = encoder.tempColIDs[:0]
encoder.values = encoder.values[:0]
encoder.offsets32 = encoder.offsets32[:0]
encoder.offsets = encoder.offsets[:0]
encoder.checksumHeader = 0
encoder.checksum1 = 0
encoder.checksum2 = 0
}
func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {
for i, colID := range colIDs {
encoder.appendColVal(colID, &values[i])
}
}
func (encoder *Encoder) appendColVal(colID int64, d *types.Datum) {
if colID > 255 {
encoder.flags |= rowFlagLarge
}
if d.IsNull() {
encoder.numNullCols++
} else {
encoder.numNotNullCols++
}
encoder.tempColIDs = append(encoder.tempColIDs, colID)
encoder.values = append(encoder.values, d)
}
func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
r := &encoder.row
numCols = len(encoder.tempColIDs)
nullIdx := numCols - int(r.numNullCols)
notNullIdx = 0
if r.large() {
r.initColIDs32()
r.initOffsets32()
} else {
r.initColIDs()
r.initOffsets()
}
for i, colID := range encoder.tempColIDs {
if encoder.values[i].IsNull() {
if r.large() {
r.colIDs32[nullIdx] = uint32(colID)
} else {
r.colIDs[nullIdx] = byte(colID)
}
nullIdx++
} else {
if r.large() {
r.colIDs32[notNullIdx] = uint32(colID)
} else {
r.colIDs[notNullIdx] = byte(colID)
}
encoder.values[notNullIdx] = encoder.values[i]
notNullIdx++
}
}
if r.large() {
largeNotNullSorter := (*largeNotNullSorter)(encoder)
sort.Sort(largeNotNullSorter)
if r.numNullCols > 0 {
largeNullSorter := (*largeNullSorter)(encoder)
sort.Sort(largeNullSorter)
}
} else {
smallNotNullSorter := (*smallNotNullSorter)(encoder)
sort.Sort(smallNotNullSorter)
if r.numNullCols > 0 {
smallNullSorter := (*smallNullSorter)(encoder)
sort.Sort(smallNullSorter)
}
}
return
}
func (encoder *Encoder) encodeRowCols(loc *time.Location, numCols, notNullIdx int) error {
r := &encoder.row
var errs error
for i := range notNullIdx {
d := encoder.values[i]
var err error
r.data, err = encodeValueDatum(loc, d, r.data)
if err != nil {
errs = multierr.Append(errs, err)
}
// handle convert to large
if len(r.data) > math.MaxUint16 && !r.large() {
r.initColIDs32()
for j := range numCols {
r.colIDs32[j] = uint32(r.colIDs[j])
}
r.initOffsets32()
for j := 0; j <= i; j++ {
r.offsets32[j] = uint32(r.offsets[j])
}
r.flags |= rowFlagLarge
}
if r.large() {
r.offsets32[i] = uint32(len(r.data))
} else {
r.offsets[i] = uint16(len(r.data))
}
}
return errs
}
// encodeValueDatum encodes one row datum entry into bytes.
// due to encode as value, this method will flatten value type like tablecodec.flatten
func encodeValueDatum(loc *time.Location, d *types.Datum, buffer []byte) (nBuffer []byte, err error) {
switch d.Kind() {
case types.KindInt64:
buffer = encodeInt(buffer, d.GetInt64())
case types.KindUint64:
buffer = encodeUint(buffer, d.GetUint64())
case types.KindString, types.KindBytes:
buffer = append(buffer, d.GetBytes()...)
case types.KindMysqlTime:
// for mysql datetime, timestamp and date type
t := d.GetMysqlTime()
if t.Type() == mysql.TypeTimestamp && loc != nil && loc != time.UTC {
err = t.ConvertTimeZone(loc, time.UTC)
if err != nil {
return
}
}
var v uint64
v, err = t.ToPackedUint()
if err != nil {
return
}
buffer = encodeUint(buffer, v)
case types.KindMysqlDuration:
buffer = encodeInt(buffer, int64(d.GetMysqlDuration().Duration))
case types.KindMysqlEnum:
buffer = encodeUint(buffer, d.GetMysqlEnum().Value)
case types.KindMysqlSet:
buffer = encodeUint(buffer, d.GetMysqlSet().Value)
case types.KindBinaryLiteral, types.KindMysqlBit:
// We don't need to handle errors here since the literal is ensured to be able to store in uint64 in convertToMysqlBit.
var val uint64
val, err = d.GetBinaryLiteral().ToInt(types.StrictContext)
if err != nil {
return
}
buffer = encodeUint(buffer, val)
case types.KindFloat32, types.KindFloat64:
buffer = codec.EncodeFloat(buffer, d.GetFloat64())
case types.KindMysqlDecimal:
buffer, err = codec.EncodeDecimal(buffer, d.GetMysqlDecimal(), d.Length(), d.Frac())
case types.KindMysqlJSON:
j := d.GetMysqlJSON()
buffer = append(buffer, j.TypeCode)
buffer = append(buffer, j.Value...)
case types.KindVectorFloat32:
v := d.GetVectorFloat32()
buffer = v.SerializeTo(buffer)
default:
err = errors.Errorf("unsupport encode type %d", d.Kind())
}
nBuffer = buffer
return
}
// Checksum is used to calculate and append checksum data into the raw bytes
type Checksum interface {
encode(encoder *Encoder, buf []byte) ([]byte, error)
}
// NoChecksum indicates no checksum is encoded into the returned raw bytes.
type NoChecksum struct{}
func (NoChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
encoder.flags &^= rowFlagChecksum // revert checksum flag
return encoder.toBytes(buf), nil
}
// introduced since v7.1.0
const checksumVersionColumn byte = 0
// introduced since v8.3.0
const checksumVersionRawKey byte = 1
// introduced since v8.4.0
const checksumVersionRawHandle byte = 2
// RawChecksum indicates encode the raw bytes checksum and append it to the raw bytes.
type RawChecksum struct {
Handle kv.Handle
}
func (c RawChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
encoder.flags |= rowFlagChecksum
encoder.checksumHeader &^= checksumFlagExtra // revert extra checksum flag
encoder.checksumHeader &^= checksumMaskVersion // revert checksum version
encoder.checksumHeader |= checksumVersionRawHandle // set checksum version
valueBytes := encoder.toBytes(buf)
valueBytes = append(valueBytes, encoder.checksumHeader)
encoder.checksum1 = crc32.Checksum(valueBytes, crc32.IEEETable)
encoder.checksum1 = crc32.Update(encoder.checksum1, crc32.IEEETable, c.Handle.Encoded())
valueBytes = binary.LittleEndian.AppendUint32(valueBytes, encoder.checksum1)
return valueBytes, nil
}