Files
tidb/pkg/lightning/backend/external/engine_test.go

411 lines
14 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"fmt"
"slices"
"testing"
"time"
"github.com/docker/go-units"
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
)
func testGetFirstAndLastKey(
t *testing.T,
data engineapi.IngestData,
lowerBound, upperBound []byte,
expectedFirstKey, expectedLastKey []byte,
) {
firstKey, lastKey, err := data.GetFirstAndLastKey(lowerBound, upperBound)
require.NoError(t, err)
require.Equal(t, expectedFirstKey, firstKey)
require.Equal(t, expectedLastKey, lastKey)
}
func testNewIter(
t *testing.T,
data engineapi.IngestData,
lowerBound, upperBound []byte,
expectedKVs []KVPair,
) {
ctx := context.Background()
iter := data.NewIter(ctx, lowerBound, upperBound, nil)
var kvs []KVPair
for iter.First(); iter.Valid(); iter.Next() {
require.NoError(t, iter.Error())
kvs = append(kvs, KVPair{Key: iter.Key(), Value: iter.Value()})
}
require.NoError(t, iter.Error())
require.NoError(t, iter.Close())
require.Equal(t, expectedKVs, kvs)
}
func TestMemoryIngestData(t *testing.T) {
kvs := []KVPair{
{Key: []byte("key1"), Value: []byte("value1")},
{Key: []byte("key2"), Value: []byte("value2")},
{Key: []byte("key3"), Value: []byte("value3")},
{Key: []byte("key4"), Value: []byte("value4")},
{Key: []byte("key5"), Value: []byte("value5")},
}
data := &MemoryIngestData{
kvs: kvs,
ts: 123,
}
require.EqualValues(t, 123, data.GetTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key35"), []byte("key3"), []byte("key3"))
testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key26"), nil, nil)
testGetFirstAndLastKey(t, data, []byte("key0"), []byte("key1"), nil, nil)
testGetFirstAndLastKey(t, data, []byte("key6"), []byte("key9"), nil, nil)
testNewIter(t, data, nil, nil, kvs)
testNewIter(t, data, []byte("key1"), []byte("key6"), kvs)
testNewIter(t, data, []byte("key2"), []byte("key5"), kvs[1:4])
testNewIter(t, data, []byte("key25"), []byte("key35"), kvs[2:3])
testNewIter(t, data, []byte("key25"), []byte("key26"), nil)
testNewIter(t, data, []byte("key0"), []byte("key1"), nil)
testNewIter(t, data, []byte("key6"), []byte("key9"), nil)
data = &MemoryIngestData{
ts: 234,
}
encodedKVs := make([]KVPair, 0, len(kvs)*2)
duplicatedKVs := make([]KVPair, 0, len(kvs)*2)
for i := range kvs {
encodedKey := slices.Clone(kvs[i].Key)
encodedKVs = append(encodedKVs, KVPair{Key: encodedKey, Value: kvs[i].Value})
if i%2 == 0 {
continue
}
// duplicatedKeys will be like key2_0, key2_1, key4_0, key4_1
duplicatedKVs = append(duplicatedKVs, KVPair{Key: encodedKey, Value: kvs[i].Value})
encodedKey = slices.Clone(kvs[i].Key)
newValues := make([]byte, len(kvs[i].Value)+1)
copy(newValues, kvs[i].Value)
newValues[len(kvs[i].Value)] = 1
encodedKVs = append(encodedKVs, KVPair{Key: encodedKey, Value: newValues})
duplicatedKVs = append(duplicatedKVs, KVPair{Key: encodedKey, Value: newValues})
}
data.kvs = encodedKVs
require.EqualValues(t, 234, data.GetTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key35"), []byte("key3"), []byte("key3"))
testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key26"), nil, nil)
testGetFirstAndLastKey(t, data, []byte("key0"), []byte("key1"), nil, nil)
testGetFirstAndLastKey(t, data, []byte("key6"), []byte("key9"), nil, nil)
}
func prepareKVFiles(t *testing.T, store storeapi.Storage, contents [][]KVPair) (dataFiles, statFiles []string) {
ctx := context.Background()
for i, c := range contents {
var summary *WriterSummary
// we want to create a file for each content, so make the below size larger.
writer := NewWriterBuilder().SetPropKeysDistance(4).
SetMemorySizeLimit(8*units.MiB).SetBlockSize(8*units.MiB).
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
Build(store, "/test", fmt.Sprintf("%d", i))
for _, p := range c {
require.NoError(t, writer.WriteRow(ctx, p.Key, p.Value, nil))
}
require.NoError(t, writer.Close(ctx))
require.Len(t, summary.MultipleFilesStats, 1)
require.Len(t, summary.MultipleFilesStats[0].Filenames, 1)
require.Zero(t, summary.ConflictInfo.Count)
require.Empty(t, summary.ConflictInfo.Files)
dataFiles = append(dataFiles, summary.MultipleFilesStats[0].Filenames[0][0])
statFiles = append(statFiles, summary.MultipleFilesStats[0].Filenames[0][1])
}
return
}
func getAllDataFromDataAndRanges(t *testing.T, dataAndRanges *engineapi.DataAndRanges) []KVPair {
ctx := context.Background()
iter := dataAndRanges.Data.NewIter(ctx, nil, nil, membuf.NewPool())
var allKVs []KVPair
for iter.First(); iter.Valid(); iter.Next() {
allKVs = append(allKVs, KVPair{Key: iter.Key(), Value: iter.Value()})
}
require.NoError(t, iter.Close())
return allKVs
}
func TestEngineOnDup(t *testing.T) {
ctx := context.Background()
contents := [][]KVPair{{
{Key: []byte{4}, Value: []byte("bbb")},
{Key: []byte{4}, Value: []byte("bbb")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{2}, Value: []byte("vv")},
{Key: []byte{3}, Value: []byte("sds")},
}}
getEngineFn := func(store storeapi.Storage, onDup engineapi.OnDuplicateKey, inDataFiles, inStatFiles []string) *Engine {
return NewExternalEngine(
ctx,
store, inDataFiles, inStatFiles,
[]byte{1}, []byte{5},
[][]byte{{1}, {2}, {3}, {4}, {5}},
[][]byte{{1}, {3}, {5}},
10,
123,
456,
789,
true,
16*units.GiB,
onDup,
"/",
)
}
t.Run("on duplicate ignore", func(t *testing.T) {
onDup := engineapi.OnDuplicateKeyIgnore
store := objstore.NewMemStorage()
dataFiles, statFiles := prepareKVFiles(t, store, contents)
extEngine := getEngineFn(store, onDup, dataFiles, statFiles)
loadDataCh := make(chan engineapi.DataAndRanges, 4)
require.ErrorContains(t, extEngine.LoadIngestData(ctx, loadDataCh), "duplicate key found")
t.Cleanup(func() {
require.NoError(t, extEngine.Close())
})
})
t.Run("on duplicate error", func(t *testing.T) {
onDup := engineapi.OnDuplicateKeyError
store := objstore.NewMemStorage()
dataFiles, statFiles := prepareKVFiles(t, store, contents)
extEngine := getEngineFn(store, onDup, dataFiles, statFiles)
loadDataCh := make(chan engineapi.DataAndRanges, 4)
require.ErrorContains(t, extEngine.LoadIngestData(ctx, loadDataCh), "[Lightning:Restore:ErrFoundDuplicateKey]found duplicate key '\x01', value 'aa'")
t.Cleanup(func() {
require.NoError(t, extEngine.Close())
})
})
t.Run("on duplicate record or remove, no duplicates", func(t *testing.T) {
for _, od := range []engineapi.OnDuplicateKey{engineapi.OnDuplicateKeyRecord, engineapi.OnDuplicateKeyRemove} {
store := objstore.NewMemStorage()
dfiles, sfiles := prepareKVFiles(t, store, [][]KVPair{{
{Key: []byte{4}, Value: []byte("bbb")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{2}, Value: []byte("vv")},
{Key: []byte{3}, Value: []byte("sds")},
}})
extEngine := getEngineFn(store, od, dfiles, sfiles)
loadDataCh := make(chan engineapi.DataAndRanges, 4)
require.NoError(t, extEngine.LoadIngestData(ctx, loadDataCh))
t.Cleanup(func() {
require.NoError(t, extEngine.Close())
})
require.Len(t, loadDataCh, 1)
dataAndRanges := <-loadDataCh
allKVs := getAllDataFromDataAndRanges(t, &dataAndRanges)
require.EqualValues(t, []KVPair{
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{2}, Value: []byte("vv")},
{Key: []byte{3}, Value: []byte("sds")},
{Key: []byte{4}, Value: []byte("bbb")},
}, allKVs)
info := extEngine.ConflictInfo()
require.Zero(t, info.Count)
require.Empty(t, info.Files)
}
})
t.Run("on duplicate record or remove, partial duplicated", func(t *testing.T) {
contents2 := [][]KVPair{
{{Key: []byte{1}, Value: []byte("aa")}, {Key: []byte{1}, Value: []byte("aa")}},
{{Key: []byte{1}, Value: []byte("aa")}, {Key: []byte{2}, Value: []byte("vv")}, {Key: []byte{3}, Value: []byte("sds")}},
{{Key: []byte{4}, Value: []byte("bbb")}, {Key: []byte{4}, Value: []byte("bbb")}},
}
for _, cont := range [][][]KVPair{contents, contents2} {
for _, od := range []engineapi.OnDuplicateKey{engineapi.OnDuplicateKeyRecord, engineapi.OnDuplicateKeyRemove} {
store := objstore.NewMemStorage()
dataFiles, statFiles := prepareKVFiles(t, store, cont)
extEngine := getEngineFn(store, od, dataFiles, statFiles)
loadDataCh := make(chan engineapi.DataAndRanges, 4)
require.NoError(t, extEngine.LoadIngestData(ctx, loadDataCh))
t.Cleanup(func() {
require.NoError(t, extEngine.Close())
})
require.Len(t, loadDataCh, 1)
dataAndRanges := <-loadDataCh
allKVs := getAllDataFromDataAndRanges(t, &dataAndRanges)
require.EqualValues(t, []KVPair{
{Key: []byte{2}, Value: []byte("vv")},
{Key: []byte{3}, Value: []byte("sds")},
}, allKVs)
info := extEngine.ConflictInfo()
if od == engineapi.OnDuplicateKeyRemove {
require.Zero(t, info.Count)
require.Empty(t, info.Files)
} else {
require.EqualValues(t, 5, info.Count)
require.Len(t, info.Files, 1)
dupPairs := readKVFile(t, store, info.Files[0])
require.EqualValues(t, []KVPair{
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{1}, Value: []byte("aa")},
{Key: []byte{4}, Value: []byte("bbb")},
{Key: []byte{4}, Value: []byte("bbb")},
}, dupPairs)
}
}
}
})
t.Run("on duplicate record or remove, all duplicated", func(t *testing.T) {
for _, od := range []engineapi.OnDuplicateKey{engineapi.OnDuplicateKeyRecord, engineapi.OnDuplicateKeyRemove} {
store := objstore.NewMemStorage()
dfiles, sfiles := prepareKVFiles(t, store, [][]KVPair{{
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
}})
extEngine := getEngineFn(store, od, dfiles, sfiles)
loadDataCh := make(chan engineapi.DataAndRanges, 4)
require.NoError(t, extEngine.LoadIngestData(ctx, loadDataCh))
t.Cleanup(func() {
require.NoError(t, extEngine.Close())
})
require.Len(t, loadDataCh, 1)
dataAndRanges := <-loadDataCh
allKVs := getAllDataFromDataAndRanges(t, &dataAndRanges)
require.Empty(t, allKVs)
info := extEngine.ConflictInfo()
if od == engineapi.OnDuplicateKeyRemove {
require.Zero(t, info.Count)
require.Empty(t, info.Files)
} else {
require.EqualValues(t, 4, info.Count)
require.Len(t, info.Files, 1)
dupPairs := readKVFile(t, store, info.Files[0])
require.EqualValues(t, []KVPair{
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
{Key: []byte{1}, Value: []byte("aaa")},
}, dupPairs)
}
}
})
}
type dummyWorker struct{}
func (w *dummyWorker) Tune(int32, bool) {
}
func TestChangeEngineConcurrency(t *testing.T) {
var (
outCh chan engineapi.DataAndRanges
eg errgroup.Group
e *Engine
finished atomic.Int32
updatedCh chan struct{}
)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/lightning/backend/external/mockLoadBatchRegionData", "return(true)")
resetFn := func() {
outCh = make(chan engineapi.DataAndRanges, 4)
updatedCh = make(chan struct{})
eg = errgroup.Group{}
finished.Store(0)
e = &Engine{
jobKeys: make([][]byte, 64),
workerConcurrency: *atomic.NewInt32(4),
readyCh: make(chan struct{}),
}
e.SetWorkerPool(&dummyWorker{})
// Load and consume the data
eg.Go(func() error {
defer close(outCh)
return e.LoadIngestData(context.Background(), outCh)
})
eg.Go(func() error {
<-updatedCh
for data := range outCh {
data.Data.DecRef()
finished.Add(1)
}
return nil
})
}
t.Run("reduce concurrency", func(t *testing.T) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/lightning/backend/external/afterUpdateWorkerConcurrency", func() {
updatedCh <- struct{}{}
})
resetFn()
// Make sure update concurrency is triggered.
require.Eventually(t, func() bool {
return len(outCh) >= 4
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, e.UpdateResource(context.Background(), 1, 1024))
require.NoError(t, eg.Wait())
})
t.Run("increase concurrency", func(t *testing.T) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/lightning/backend/external/afterUpdateWorkerConcurrency", func() {
updatedCh <- struct{}{}
})
resetFn()
// Make sure update concurrency is triggered.
require.Eventually(t, func() bool {
return len(outCh) >= 4
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, e.UpdateResource(context.Background(), 8, 1024))
require.NoError(t, eg.Wait())
})
t.Run("increase concurrency after loading all data", func(t *testing.T) {
resetFn()
close(updatedCh)
// Wait all the data being processed
require.Eventually(t, func() bool {
return finished.Load() >= 16
}, 3*time.Second, 10*time.Millisecond)
require.NoError(t, e.UpdateResource(context.Background(), 8, 1024))
require.NoError(t, eg.Wait())
})
}