br/lightning: change KvPair's row ID type from int64 to []bytes (#41787)
ref pingcap/tidb#37119
This commit is contained in:
@ -433,7 +433,8 @@ func (kvcodec *tableKVEncoder) Encode(
|
||||
}
|
||||
kvPairs := kvcodec.se.takeKvPairs()
|
||||
for i := 0; i < len(kvPairs.pairs); i++ {
|
||||
kvPairs.pairs[i].RowID = rowID
|
||||
var encoded [9]byte // The max length of encoded int64 is 9.
|
||||
kvPairs.pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
|
||||
}
|
||||
kvcodec.recordCache = record[:0]
|
||||
return kvPairs, nil
|
||||
|
||||
@ -113,7 +113,7 @@ func TestEncode(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
|
||||
Val: []uint8{0x8, 0x2, 0x8, 0x2},
|
||||
RowID: 2,
|
||||
RowID: common.EncodeIntRowID(2),
|
||||
},
|
||||
}))
|
||||
|
||||
@ -140,7 +140,7 @@ func TestEncode(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
|
||||
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
|
||||
RowID: 1,
|
||||
RowID: common.EncodeIntRowID(1),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@ -274,7 +274,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
|
||||
0x1, 0x0, // not null offsets = [1]
|
||||
0x7f, // column version = 127 (10000000 clamped to TINYINT)
|
||||
},
|
||||
RowID: 1,
|
||||
RowID: common.EncodeIntRowID(1),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
|
||||
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
|
||||
RowID: 70,
|
||||
RowID: common.EncodeIntRowID(70),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@ -346,12 +346,12 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
|
||||
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
|
||||
RowID: 70,
|
||||
RowID: common.EncodeIntRowID(70),
|
||||
},
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
|
||||
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
|
||||
RowID: 70,
|
||||
RowID: common.EncodeIntRowID(70),
|
||||
},
|
||||
}), pairsExpect)
|
||||
|
||||
@ -459,7 +459,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
|
||||
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
|
||||
RowID: 70,
|
||||
RowID: common.EncodeIntRowID(70),
|
||||
},
|
||||
}))
|
||||
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(70))
|
||||
@ -470,7 +470,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
|
||||
{
|
||||
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
|
||||
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
|
||||
RowID: 71,
|
||||
RowID: common.EncodeIntRowID(71),
|
||||
},
|
||||
}))
|
||||
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(71))
|
||||
|
||||
@ -699,8 +699,8 @@ func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, du
|
||||
}
|
||||
|
||||
// Delete the key range in duplicate DB since we have the duplicates have been collected.
|
||||
rawStartKey := keyAdapter.Encode(nil, task.StartKey, math.MinInt64)
|
||||
rawEndKey := keyAdapter.Encode(nil, task.EndKey, math.MinInt64)
|
||||
rawStartKey := keyAdapter.Encode(nil, task.StartKey, MinRowID)
|
||||
rawEndKey := keyAdapter.Encode(nil, task.EndKey, MinRowID)
|
||||
err = dupDB.DeleteRange(rawStartKey, rawEndKey, nil)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
@ -1070,7 +1070,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
|
||||
keyAdapter := w.engine.keyAdapter
|
||||
totalKeySize := 0
|
||||
for i := 0; i < len(kvs); i++ {
|
||||
keySize := keyAdapter.EncodedLen(kvs[i].Key)
|
||||
keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID)
|
||||
w.batchSize += int64(keySize + len(kvs[i].Val))
|
||||
totalKeySize += keySize
|
||||
}
|
||||
@ -1107,7 +1107,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
|
||||
}
|
||||
lastKey = pair.Key
|
||||
w.batchSize += int64(len(pair.Key) + len(pair.Val))
|
||||
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key))
|
||||
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
|
||||
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
|
||||
val := w.kvBuffer.AddBytes(pair.Val)
|
||||
if cnt < l {
|
||||
|
||||
@ -17,7 +17,6 @@ package local
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
|
||||
@ -91,7 +90,7 @@ type dupDetectOpt struct {
|
||||
}
|
||||
|
||||
func (d *dupDetectIter) Seek(key []byte) bool {
|
||||
rawKey := d.keyAdapter.Encode(nil, key, 0)
|
||||
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
|
||||
if d.err != nil || !d.iter.SeekGE(rawKey) {
|
||||
return false
|
||||
}
|
||||
@ -209,10 +208,10 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
|
||||
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter {
|
||||
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
|
||||
if len(opts.LowerBound) > 0 {
|
||||
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
|
||||
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
|
||||
}
|
||||
if len(opts.UpperBound) > 0 {
|
||||
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
|
||||
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
|
||||
}
|
||||
return &dupDetectIter{
|
||||
ctx: ctx,
|
||||
@ -232,7 +231,7 @@ type dupDBIter struct {
|
||||
}
|
||||
|
||||
func (d *dupDBIter) Seek(key []byte) bool {
|
||||
rawKey := d.keyAdapter.Encode(nil, key, 0)
|
||||
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
|
||||
if d.err != nil || !d.iter.SeekGE(rawKey) {
|
||||
return false
|
||||
}
|
||||
@ -296,10 +295,10 @@ var _ Iter = &dupDBIter{}
|
||||
func newDupDBIter(dupDB *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions) *dupDBIter {
|
||||
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
|
||||
if len(opts.LowerBound) > 0 {
|
||||
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
|
||||
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID)
|
||||
}
|
||||
if len(opts.UpperBound) > 0 {
|
||||
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
|
||||
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID)
|
||||
}
|
||||
return &dupDBIter{
|
||||
iter: dupDB.NewIter(newOpts),
|
||||
|
||||
@ -37,7 +37,7 @@ func TestDupDetectIterator(t *testing.T) {
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: randBytes(32),
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
}
|
||||
@ -47,13 +47,13 @@ func TestDupDetectIterator(t *testing.T) {
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: key,
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: key,
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
}
|
||||
@ -63,19 +63,19 @@ func TestDupDetectIterator(t *testing.T) {
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: key,
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: key,
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
pairs = append(pairs, common.KvPair{
|
||||
Key: key,
|
||||
Val: randBytes(128),
|
||||
RowID: prevRowMax,
|
||||
RowID: common.EncodeIntRowID(prevRowMax),
|
||||
})
|
||||
prevRowMax++
|
||||
}
|
||||
@ -184,22 +184,22 @@ func TestDupDetectIterSeek(t *testing.T) {
|
||||
{
|
||||
Key: []byte{1, 2, 3, 0},
|
||||
Val: randBytes(128),
|
||||
RowID: 1,
|
||||
RowID: common.EncodeIntRowID(1),
|
||||
},
|
||||
{
|
||||
Key: []byte{1, 2, 3, 1},
|
||||
Val: randBytes(128),
|
||||
RowID: 2,
|
||||
RowID: common.EncodeIntRowID(2),
|
||||
},
|
||||
{
|
||||
Key: []byte{1, 2, 3, 1},
|
||||
Val: randBytes(128),
|
||||
RowID: 3,
|
||||
RowID: common.EncodeIntRowID(3),
|
||||
},
|
||||
{
|
||||
Key: []byte{1, 2, 3, 2},
|
||||
Val: randBytes(128),
|
||||
RowID: 4,
|
||||
RowID: common.EncodeIntRowID(4),
|
||||
},
|
||||
}
|
||||
|
||||
@ -227,3 +227,17 @@ func TestDupDetectIterSeek(t *testing.T) {
|
||||
require.NoError(t, db.Close())
|
||||
require.NoError(t, dupDB.Close())
|
||||
}
|
||||
|
||||
func TestKeyAdapterEncoding(t *testing.T) {
|
||||
keyAdapter := dupDetectKeyAdapter{}
|
||||
srcKey := []byte{1, 2, 3}
|
||||
v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1))
|
||||
resKey, err := keyAdapter.Decode(nil, v)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, srcKey, resKey)
|
||||
|
||||
v = keyAdapter.Encode(nil, srcKey, []byte("mock_common_handle"))
|
||||
resKey, err = keyAdapter.Decode(nil, v)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, srcKey, resKey)
|
||||
}
|
||||
|
||||
@ -15,9 +15,10 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
)
|
||||
|
||||
@ -25,13 +26,13 @@ import (
|
||||
type KeyAdapter interface {
|
||||
// Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the
|
||||
// resulting slice. The encoded key is guaranteed to be in ascending order for comparison.
|
||||
Encode(dst []byte, key []byte, rowID int64) []byte
|
||||
Encode(dst []byte, key []byte, rowID []byte) []byte
|
||||
|
||||
// Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice.
|
||||
Decode(dst []byte, data []byte) ([]byte, error)
|
||||
|
||||
// EncodedLen returns the encoded key length.
|
||||
EncodedLen(key []byte) int
|
||||
EncodedLen(key []byte, rowID []byte) int
|
||||
}
|
||||
|
||||
func reallocBytes(b []byte, n int) []byte {
|
||||
@ -46,7 +47,7 @@ func reallocBytes(b []byte, n int) []byte {
|
||||
|
||||
type noopKeyAdapter struct{}
|
||||
|
||||
func (noopKeyAdapter) Encode(dst []byte, key []byte, _ int64) []byte {
|
||||
func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
|
||||
return append(dst, key...)
|
||||
}
|
||||
|
||||
@ -54,7 +55,7 @@ func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
|
||||
return append(dst, data...), nil
|
||||
}
|
||||
|
||||
func (noopKeyAdapter) EncodedLen(key []byte) int {
|
||||
func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
|
||||
return len(key)
|
||||
}
|
||||
|
||||
@ -62,20 +63,25 @@ var _ KeyAdapter = noopKeyAdapter{}
|
||||
|
||||
type dupDetectKeyAdapter struct{}
|
||||
|
||||
func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID int64) []byte {
|
||||
func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte {
|
||||
dst = codec.EncodeBytes(dst, key)
|
||||
dst = reallocBytes(dst, 8)
|
||||
n := len(dst)
|
||||
dst = dst[:n+8]
|
||||
binary.BigEndian.PutUint64(dst[n:n+8], codec.EncodeIntToCmpUint(rowID))
|
||||
dst = reallocBytes(dst, len(rowID)+2)
|
||||
dst = append(dst, rowID...)
|
||||
rowIDLen := uint16(len(rowID))
|
||||
dst = append(dst, byte(rowIDLen>>8), byte(rowIDLen))
|
||||
return dst
|
||||
}
|
||||
|
||||
func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
|
||||
if len(data) < 8 {
|
||||
if len(data) < 2 {
|
||||
return nil, errors.New("insufficient bytes to decode value")
|
||||
}
|
||||
_, key, err := codec.DecodeBytes(data[:len(data)-8], dst[len(dst):cap(dst)])
|
||||
rowIDLen := uint16(data[len(data)-2])<<8 | uint16(data[len(data)-1])
|
||||
tailLen := int(rowIDLen + 2)
|
||||
if len(data) < tailLen {
|
||||
return nil, errors.New("insufficient bytes to decode value")
|
||||
}
|
||||
_, key, err := codec.DecodeBytes(data[:len(data)-tailLen], dst[len(dst):cap(dst)])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -90,8 +96,13 @@ func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
|
||||
return append(dst, key...), nil
|
||||
}
|
||||
|
||||
func (dupDetectKeyAdapter) EncodedLen(key []byte) int {
|
||||
return codec.EncodedBytesLength(len(key)) + 8
|
||||
func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {
|
||||
return codec.EncodedBytesLength(len(key)) + len(rowID) + 2
|
||||
}
|
||||
|
||||
var _ KeyAdapter = dupDetectKeyAdapter{}
|
||||
|
||||
var (
|
||||
MinRowID = common.EncodeIntRowID(math.MinInt64)
|
||||
ZeroRowID = common.EncodeIntRowID(0)
|
||||
)
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -34,8 +35,8 @@ func randBytes(n int) []byte {
|
||||
func TestNoopKeyAdapter(t *testing.T) {
|
||||
keyAdapter := noopKeyAdapter{}
|
||||
key := randBytes(32)
|
||||
require.Len(t, key, keyAdapter.EncodedLen(key))
|
||||
encodedKey := keyAdapter.Encode(nil, key, 0)
|
||||
require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
|
||||
encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
|
||||
require.Equal(t, key, encodedKey)
|
||||
|
||||
decodedKey, err := keyAdapter.Decode(nil, encodedKey)
|
||||
@ -68,8 +69,9 @@ func TestDupDetectKeyAdapter(t *testing.T) {
|
||||
|
||||
keyAdapter := dupDetectKeyAdapter{}
|
||||
for _, input := range inputs {
|
||||
result := keyAdapter.Encode(nil, input.key, input.rowID)
|
||||
require.Equal(t, keyAdapter.EncodedLen(input.key), len(result))
|
||||
encodedRowID := common.EncodeIntRowID(input.rowID)
|
||||
result := keyAdapter.Encode(nil, input.key, encodedRowID)
|
||||
require.Equal(t, keyAdapter.EncodedLen(input.key, encodedRowID), len(result))
|
||||
|
||||
// Decode the result.
|
||||
key, err := keyAdapter.Decode(nil, result)
|
||||
@ -89,7 +91,7 @@ func TestDupDetectKeyOrder(t *testing.T) {
|
||||
keyAdapter := dupDetectKeyAdapter{}
|
||||
encodedKeys := make([][]byte, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
encodedKeys = append(encodedKeys, keyAdapter.Encode(nil, key, 1))
|
||||
encodedKeys = append(encodedKeys, keyAdapter.Encode(nil, key, common.EncodeIntRowID(1)))
|
||||
}
|
||||
sorted := sort.SliceIsSorted(encodedKeys, func(i, j int) bool {
|
||||
return bytes.Compare(encodedKeys[i], encodedKeys[j]) < 0
|
||||
@ -100,8 +102,8 @@ func TestDupDetectKeyOrder(t *testing.T) {
|
||||
func TestDupDetectEncodeDupKey(t *testing.T) {
|
||||
keyAdapter := dupDetectKeyAdapter{}
|
||||
key := randBytes(32)
|
||||
result1 := keyAdapter.Encode(nil, key, 10)
|
||||
result2 := keyAdapter.Encode(nil, key, 20)
|
||||
result1 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(10))
|
||||
result2 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(20))
|
||||
require.NotEqual(t, result1, result2)
|
||||
}
|
||||
|
||||
@ -114,7 +116,7 @@ func TestEncodeKeyToPreAllocatedBuf(t *testing.T) {
|
||||
for _, keyAdapter := range keyAdapters {
|
||||
key := randBytes(32)
|
||||
buf := make([]byte, 256)
|
||||
buf2 := keyAdapter.Encode(buf[:4], key, 1)
|
||||
buf2 := keyAdapter.Encode(buf[:4], key, common.EncodeIntRowID(1))
|
||||
require.True(t, startWithSameMemory(buf, buf2))
|
||||
// Verify the encoded result first.
|
||||
key2, err := keyAdapter.Decode(nil, buf2[4:])
|
||||
@ -126,7 +128,7 @@ func TestEncodeKeyToPreAllocatedBuf(t *testing.T) {
|
||||
func TestDecodeKeyToPreAllocatedBuf(t *testing.T) {
|
||||
data := []byte{
|
||||
0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
|
||||
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf,
|
||||
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
|
||||
}
|
||||
keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
|
||||
for _, keyAdapter := range keyAdapters {
|
||||
@ -143,7 +145,7 @@ func TestDecodeKeyToPreAllocatedBuf(t *testing.T) {
|
||||
func TestDecodeKeyDstIsInsufficient(t *testing.T) {
|
||||
data := []byte{
|
||||
0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
|
||||
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf,
|
||||
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
|
||||
}
|
||||
keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
|
||||
for _, keyAdapter := range keyAdapters {
|
||||
|
||||
@ -26,6 +26,7 @@ go_library(
|
||||
"//store/driver/error",
|
||||
"//table/tables",
|
||||
"//util",
|
||||
"//util/codec",
|
||||
"@com_github_go_sql_driver_mysql//:mysql",
|
||||
"@com_github_pingcap_errors//:errors",
|
||||
"@com_github_pingcap_failpoint//:failpoint",
|
||||
|
||||
@ -38,6 +38,7 @@ import (
|
||||
tmysql "github.com/pingcap/tidb/errno"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -392,7 +393,15 @@ type KvPair struct {
|
||||
// Val is the value of the KV pair
|
||||
Val []byte
|
||||
// RowID is the row id of the KV pair.
|
||||
RowID int64
|
||||
RowID []byte
|
||||
}
|
||||
|
||||
// EncodeIntRowIDToBuf encodes an int64 row id to a buffer.
|
||||
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint
|
||||
|
||||
// EncodeIntRowID encodes an int64 row id.
|
||||
func EncodeIntRowID(rowID int64) []byte {
|
||||
return codec.EncodeComparableVarint(nil, rowID)
|
||||
}
|
||||
|
||||
// TableHasAutoRowID return whether table has auto generated row id
|
||||
|
||||
@ -1786,7 +1786,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = w.writerCtx.WriteRow(key, idxVal)
|
||||
err = w.writerCtx.WriteRow(key, idxVal, idxRecord.handle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/util/generic"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
@ -146,7 +147,7 @@ func (ei *engineInfo) ImportAndClean() error {
|
||||
// WriterContext is used to keep a lightning local writer for each backfill worker.
|
||||
type WriterContext struct {
|
||||
ctx context.Context
|
||||
rowSeq func() int64
|
||||
unique bool
|
||||
lWrite *backend.LocalEngineWriter
|
||||
}
|
||||
|
||||
@ -191,13 +192,9 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex
|
||||
}
|
||||
wc := &WriterContext{
|
||||
ctx: ei.ctx,
|
||||
unique: unique,
|
||||
lWrite: lWrite,
|
||||
}
|
||||
if unique {
|
||||
wc.rowSeq = func() int64 {
|
||||
return ei.rowSeq.Add(1)
|
||||
}
|
||||
}
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
@ -218,12 +215,12 @@ func (ei *engineInfo) closeWriters() error {
|
||||
}
|
||||
|
||||
// WriteRow Write one row into local writer buffer.
|
||||
func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
|
||||
func (wCtx *WriterContext) WriteRow(key, idxVal []byte, handle tidbkv.Handle) error {
|
||||
kvs := make([]common.KvPair, 1)
|
||||
kvs[0].Key = key
|
||||
kvs[0].Val = idxVal
|
||||
if wCtx.rowSeq != nil {
|
||||
kvs[0].RowID = wCtx.rowSeq()
|
||||
if wCtx.unique {
|
||||
kvs[0].RowID = handle.Encoded()
|
||||
}
|
||||
row := kv.MakeRowsFromKvPairs(kvs)
|
||||
return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row)
|
||||
|
||||
Reference in New Issue
Block a user