Files
tidb/pkg/lightning/backend/local/iterator_test.go
2025-05-08 03:57:43 +00:00

244 lines
6.8 KiB
Go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"bytes"
"fmt"
"math/rand"
"path/filepath"
"slices"
"sort"
"strconv"
"testing"
"time"
"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/stretchr/testify/require"
)
func randBytes(n int) []byte {
b := make([]byte, n)
rand.Read(b)
return b
}
func TestDupDetectIterator(t *testing.T) {
pairs := make([]common.KvPair, 0, 20)
prevRowMax := int64(0)
// Unique pairs.
for range 20 {
pairs = append(pairs, common.KvPair{
Key: randBytes(32),
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
// Duplicate pairs which repeat the same key twice.
for i := 20; i < 40; i++ {
key := randBytes(32)
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
// Duplicate pairs which repeat the same key three times.
for i := 40; i < 50; i++ {
key := randBytes(32)
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
pairs = append(pairs, common.KvPair{
Key: key,
Val: randBytes(128),
RowID: common.EncodeIntRowID(prevRowMax),
})
prevRowMax++
}
// Find duplicates from the generated pairs.
var dupPairs []common.KvPair
sort.Slice(pairs, func(i, j int) bool {
return bytes.Compare(pairs[i].Key, pairs[j].Key) < 0
})
uniqueKeys := make([][]byte, 0)
for i := 0; i < len(pairs); {
j := i + 1
for j < len(pairs) && bytes.Equal(pairs[j-1].Key, pairs[j].Key) {
j++
}
uniqueKeys = append(uniqueKeys, pairs[i].Key)
if i+1 == j {
i++
continue
}
for k := i; k < j; k++ {
dupPairs = append(dupPairs, pairs[k])
}
i = j
}
keyAdapter := common.DupDetectKeyAdapter{}
// Write pairs to db after shuffling the pairs.
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
rnd.Shuffle(len(pairs), func(i, j int) {
pairs[i], pairs[j] = pairs[j], pairs[i]
})
storeDir := t.TempDir()
db, err := pebble.Open(filepath.Join(storeDir, "kv"), &pebble.Options{})
require.NoError(t, err)
wb := db.NewBatch()
for _, p := range pairs {
key := keyAdapter.Encode(nil, p.Key, p.RowID)
require.NoError(t, wb.Set(key, p.Val, nil))
}
require.NoError(t, wb.Commit(pebble.Sync))
dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
pool := membuf.NewPool()
defer pool.Destroy()
iter := newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), common.DupDetectOpt{}, pool.NewBuffer())
sort.Slice(pairs, func(i, j int) bool {
key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID)
key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID)
return bytes.Compare(key1, key2) < 0
})
// Verify first pair.
require.True(t, iter.First())
require.True(t, iter.Valid())
require.Equal(t, pairs[0].Key, iter.Key())
require.Equal(t, pairs[0].Val, iter.Value())
// Verify last pair.
require.True(t, iter.Last())
require.True(t, iter.Valid())
require.Equal(t, pairs[len(pairs)-1].Key, iter.Key())
require.Equal(t, pairs[len(pairs)-1].Val, iter.Value())
// Iterate all keys and check the count of unique keys.
for iter.First(); iter.Valid(); iter.Next() {
require.Equal(t, uniqueKeys[0], iter.Key())
uniqueKeys = uniqueKeys[1:]
}
require.NoError(t, iter.Error())
require.Equal(t, 0, len(uniqueKeys))
require.NoError(t, iter.Close())
require.NoError(t, db.Close())
// Check duplicates detected by dupDetectIter.
iter2 := newDupDBIter(dupDB, keyAdapter, &pebble.IterOptions{})
var detectedPairs []common.KvPair
for iter2.First(); iter2.Valid(); iter2.Next() {
detectedPairs = append(detectedPairs, common.KvPair{
Key: slices.Clone(iter2.Key()),
Val: slices.Clone(iter2.Value()),
})
}
require.NoError(t, iter2.Error())
require.NoError(t, iter2.Close())
require.NoError(t, dupDB.Close())
require.Equal(t, len(dupPairs), len(detectedPairs))
sort.Slice(dupPairs, func(i, j int) bool {
keyCmp := bytes.Compare(dupPairs[i].Key, dupPairs[j].Key)
return keyCmp < 0 || keyCmp == 0 && bytes.Compare(dupPairs[i].Val, dupPairs[j].Val) < 0
})
sort.Slice(detectedPairs, func(i, j int) bool {
keyCmp := bytes.Compare(detectedPairs[i].Key, detectedPairs[j].Key)
return keyCmp < 0 || keyCmp == 0 && bytes.Compare(detectedPairs[i].Val, detectedPairs[j].Val) < 0
})
for i := range detectedPairs {
require.Equal(t, dupPairs[i].Key, detectedPairs[i].Key)
require.Equal(t, dupPairs[i].Val, detectedPairs[i].Val)
}
}
func TestKeyAdapterEncoding(t *testing.T) {
keyAdapter := common.DupDetectKeyAdapter{}
srcKey := []byte{1, 2, 3}
v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1))
resKey, err := keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)
v = keyAdapter.Encode(nil, srcKey, []byte("mock_common_handle"))
resKey, err = keyAdapter.Decode(nil, v)
require.NoError(t, err)
require.EqualValues(t, srcKey, resKey)
}
func BenchmarkDupDetectIter(b *testing.B) {
keyAdapter := common.DupDetectKeyAdapter{}
db, _ := pebble.Open(filepath.Join(b.TempDir(), "kv"), &pebble.Options{})
wb := db.NewBatch()
val := []byte("value")
for i := range 100_000 {
keyNum := i
// mimic we have 20% duplication
if keyNum%5 == 0 {
keyNum--
}
keyStr := fmt.Sprintf("%09d", keyNum)
rowID := strconv.Itoa(i)
key := keyAdapter.Encode(nil, []byte(keyStr), []byte(rowID))
wb.Set(key, val, nil)
}
wb.Commit(pebble.Sync)
pool := membuf.NewPool()
dupDB, _ := pebble.Open(filepath.Join(b.TempDir(), "dup"), &pebble.Options{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter := newDupDetectIter(
db,
keyAdapter,
&pebble.IterOptions{},
dupDB,
log.L(),
common.DupDetectOpt{},
pool.NewBuffer(),
)
keyCnt := 0
for iter.First(); iter.Valid(); iter.Next() {
keyCnt++
}
iter.Close()
}
}