codec: encode checksum for timestamp consider tz (#50896)
close pingcap/tidb#50851
This commit is contained in:
@ -1416,7 +1416,7 @@ func (w *updateColumnWorker) calcChecksums() []uint32 {
|
||||
if !sort.IsSorted(w.checksumBuffer) {
|
||||
sort.Sort(w.checksumBuffer)
|
||||
}
|
||||
checksum, err := w.checksumBuffer.Checksum()
|
||||
checksum, err := w.checksumBuffer.Checksum(w.sessCtx.GetSessionVars().StmtCtx.TimeZone())
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err))
|
||||
return nil
|
||||
|
||||
@ -1811,11 +1811,14 @@ func (t *TableCommon) calcChecksums(sctx table.MutateContext, h kv.Handle, data
|
||||
}
|
||||
checksums := make([]uint32, len(data))
|
||||
for i, cols := range data {
|
||||
row := rowcodec.RowData{Cols: cols, Data: buf}
|
||||
row := rowcodec.RowData{
|
||||
Cols: cols,
|
||||
Data: buf,
|
||||
}
|
||||
if !sort.IsSorted(row) {
|
||||
sort.Sort(row)
|
||||
}
|
||||
checksum, err := row.Checksum()
|
||||
checksum, err := row.Checksum(sctx.GetSessionVars().StmtCtx.TimeZone())
|
||||
buf = row.Data
|
||||
if err != nil {
|
||||
logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error,
|
||||
|
||||
@ -1666,7 +1666,7 @@ func TestWriteWithChecksums(t *testing.T) {
|
||||
}
|
||||
data := rowcodec.RowData{Cols: cols}
|
||||
sort.Sort(data)
|
||||
checksum, err := data.Checksum()
|
||||
checksum, err := data.Checksum(time.Local)
|
||||
assert.NoError(t, err)
|
||||
expectChecksums = append(expectChecksums, checksum)
|
||||
}
|
||||
|
||||
@ -41,7 +41,7 @@ func BenchmarkChecksum(b *testing.B) {
|
||||
}
|
||||
row := rowcodec.RowData{Cols: cols}
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := row.Checksum()
|
||||
_, err := row.Checksum(time.Local)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"hash/crc32"
|
||||
"math"
|
||||
"reflect"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
@ -253,8 +254,8 @@ type ColData struct {
|
||||
}
|
||||
|
||||
// Encode encodes the column datum into bytes for checksum. If buf provided, append encoded data to it.
|
||||
func (c ColData) Encode(buf []byte) ([]byte, error) {
|
||||
return appendDatumForChecksum(buf, c.Datum, c.GetType())
|
||||
func (c ColData) Encode(loc *time.Location, buf []byte) ([]byte, error) {
|
||||
return appendDatumForChecksum(loc, buf, c.Datum, c.GetType())
|
||||
}
|
||||
|
||||
// RowData is a list of ColData for row checksum calculation.
|
||||
@ -276,13 +277,13 @@ func (r RowData) Less(i int, j int) bool { return r.Cols[i].ID < r.Cols[j].ID }
|
||||
func (r RowData) Swap(i int, j int) { r.Cols[i], r.Cols[j] = r.Cols[j], r.Cols[i] }
|
||||
|
||||
// Encode encodes all columns into bytes (for test purpose).
|
||||
func (r *RowData) Encode() ([]byte, error) {
|
||||
func (r *RowData) Encode(loc *time.Location) ([]byte, error) {
|
||||
var err error
|
||||
if len(r.Data) > 0 {
|
||||
r.Data = r.Data[:0]
|
||||
}
|
||||
for _, col := range r.Cols {
|
||||
r.Data, err = col.Encode(r.Data)
|
||||
r.Data, err = col.Encode(loc, r.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -291,12 +292,12 @@ func (r *RowData) Encode() ([]byte, error) {
|
||||
}
|
||||
|
||||
// Checksum calculates the checksum of columns. Callers should make sure columns are sorted by id.
|
||||
func (r *RowData) Checksum() (checksum uint32, err error) {
|
||||
func (r *RowData) Checksum(loc *time.Location) (checksum uint32, err error) {
|
||||
for _, col := range r.Cols {
|
||||
if len(r.Data) > 0 {
|
||||
r.Data = r.Data[:0]
|
||||
}
|
||||
r.Data, err = col.Encode(r.Data)
|
||||
r.Data, err = col.Encode(loc, r.Data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -305,7 +306,7 @@ func (r *RowData) Checksum() (checksum uint32, err error) {
|
||||
return checksum, nil
|
||||
}
|
||||
|
||||
func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
|
||||
func appendDatumForChecksum(loc *time.Location, buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
|
||||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
// catch panic when datum and type mismatch
|
||||
@ -321,7 +322,14 @@ func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte,
|
||||
case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
|
||||
out = appendLengthValue(buf, dat.GetBytes())
|
||||
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeNewDate:
|
||||
out = appendLengthValue(buf, []byte(dat.GetMysqlTime().String()))
|
||||
t := dat.GetMysqlTime()
|
||||
if t.Type() == mysql.TypeTimestamp && loc != nil && loc != time.UTC {
|
||||
err = t.ConvertTimeZone(loc, time.UTC)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
out = appendLengthValue(buf, []byte(t.String()))
|
||||
case mysql.TypeDuration:
|
||||
out = appendLengthValue(buf, []byte(dat.GetMysqlDuration().String()))
|
||||
case mysql.TypeFloat, mysql.TypeDouble:
|
||||
|
||||
@ -1096,7 +1096,7 @@ func TestColumnEncode(t *testing.T) {
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
col := rowcodec.ColData{&model.ColumnInfo{FieldType: *tt.typ}, &tt.dat}
|
||||
raw, err := col.Encode(buf[:0])
|
||||
raw, err := col.Encode(time.Local, buf[:0])
|
||||
if tt.ok {
|
||||
require.NoError(t, err)
|
||||
if len(tt.raw) == 0 {
|
||||
@ -1145,7 +1145,7 @@ func TestColumnEncode(t *testing.T) {
|
||||
ft := types.NewFieldType(typ)
|
||||
dat := types.NewDatum(nil)
|
||||
col := rowcodec.ColData{&model.ColumnInfo{FieldType: *ft}, &dat}
|
||||
raw, err := col.Encode(nil)
|
||||
raw, err := col.Encode(time.Local, nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, raw, 0)
|
||||
}
|
||||
@ -1161,7 +1161,10 @@ func TestRowChecksum(t *testing.T) {
|
||||
col2 := rowcodec.ColData{&model.ColumnInfo{ID: 2, FieldType: *typ2}, &dat2}
|
||||
typ3 := types.NewFieldType(mysql.TypeVarchar)
|
||||
dat3 := types.NewDatum("foobar")
|
||||
col3 := rowcodec.ColData{&model.ColumnInfo{ID: 2, FieldType: *typ3}, &dat3}
|
||||
col3 := rowcodec.ColData{&model.ColumnInfo{ID: 3, FieldType: *typ3}, &dat3}
|
||||
typ4 := types.NewFieldType(mysql.TypeTimestamp)
|
||||
dat4 := types.NewTimeDatum(types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 6))
|
||||
col4 := rowcodec.ColData{&model.ColumnInfo{ID: 4, FieldType: *typ4}, &dat4}
|
||||
buf := make([]byte, 0, 64)
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
@ -1170,17 +1173,17 @@ func TestRowChecksum(t *testing.T) {
|
||||
{"nil", nil},
|
||||
{"empty", []rowcodec.ColData{}},
|
||||
{"nullonly", []rowcodec.ColData{col1}},
|
||||
{"ordered", []rowcodec.ColData{col1, col2, col3}},
|
||||
{"unordered", []rowcodec.ColData{col3, col1, col2}},
|
||||
{"ordered", []rowcodec.ColData{col1, col2, col3, col4}},
|
||||
{"unordered", []rowcodec.ColData{col3, col1, col4, col2}},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
row := rowcodec.RowData{tt.cols, buf}
|
||||
if !sort.IsSorted(row) {
|
||||
sort.Sort(row)
|
||||
}
|
||||
checksum, err := row.Checksum()
|
||||
checksum, err := row.Checksum(time.Local)
|
||||
require.NoError(t, err)
|
||||
raw, err := row.Encode()
|
||||
raw, err := row.Encode(time.Local)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, crc32.ChecksumIEEE(raw), checksum)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user