310 lines
8.5 KiB
Go
310 lines
8.5 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"slices"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
|
|
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
|
|
dbkv "github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
|
|
"github.com/pingcap/tidb/pkg/util/size"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/exp/rand"
|
|
)
|
|
|
|
func changePropDist(t *testing.T, sizeDist, keysDist uint64) {
|
|
sizeDistBak := defaultPropSizeDist
|
|
keysDistBak := defaultPropKeysDist
|
|
t.Cleanup(func() {
|
|
defaultPropSizeDist = sizeDistBak
|
|
defaultPropKeysDist = keysDistBak
|
|
})
|
|
defaultPropSizeDist = sizeDist
|
|
defaultPropKeysDist = keysDist
|
|
}
|
|
|
|
func TestGlobalSortLocalBasic(t *testing.T) {
|
|
// 1. write data step
|
|
seed := time.Now().Unix()
|
|
rand.Seed(uint64(seed))
|
|
t.Logf("seed: %d", seed)
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
memSizeLimit := (rand.Intn(10) + 1) * 400
|
|
lastStepDatas := make([]string, 0, 10)
|
|
lastStepStats := make([]string, 0, 10)
|
|
var startKey, endKey dbkv.Key
|
|
|
|
onWriterClose := func(s *WriterSummary) {
|
|
for _, stat := range s.MultipleFilesStats {
|
|
for i := range stat.Filenames {
|
|
lastStepDatas = append(lastStepDatas, stat.Filenames[i][0])
|
|
lastStepStats = append(lastStepStats, stat.Filenames[i][1])
|
|
}
|
|
}
|
|
if len(startKey) == 0 && len(endKey) == 0 {
|
|
startKey = s.Min.Clone()
|
|
endKey = s.Max.Clone().Next()
|
|
}
|
|
startKey = BytesMin(startKey, s.Min.Clone())
|
|
endKey = BytesMax(endKey, s.Max.Clone().Next())
|
|
}
|
|
|
|
w := NewWriterBuilder().
|
|
SetPropSizeDistance(100).
|
|
SetPropKeysDistance(2).
|
|
SetMemorySizeLimit(uint64(memSizeLimit)).
|
|
SetBlockSize(memSizeLimit).
|
|
SetOnCloseFunc(onWriterClose).
|
|
Build(memStore, "/test", "0")
|
|
|
|
writer := NewEngineWriter(w)
|
|
kvCnt := rand.Intn(10) + 10000
|
|
kvs := make([]common.KvPair, kvCnt)
|
|
for i := range kvCnt {
|
|
kvs[i] = common.KvPair{
|
|
Key: []byte(uuid.New().String()),
|
|
Val: []byte("56789"),
|
|
}
|
|
}
|
|
slices.SortFunc(kvs, func(i, j common.KvPair) int {
|
|
return bytes.Compare(i.Key, j.Key)
|
|
})
|
|
|
|
require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
|
|
_, err := writer.Close(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// 2. read and sort step
|
|
testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit)
|
|
}
|
|
|
|
func TestGlobalSortLocalWithMerge(t *testing.T) {
|
|
changePropDist(t, 100, 2)
|
|
// 1. write data step
|
|
seed := time.Now().Unix()
|
|
rand.Seed(uint64(seed))
|
|
t.Logf("seed: %d", seed)
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
memSizeLimit := (rand.Intn(10) + 1) * 400
|
|
|
|
w := NewWriterBuilder().
|
|
SetMemorySizeLimit(uint64(memSizeLimit)).
|
|
SetBlockSize(memSizeLimit).
|
|
Build(memStore, "/test", "0")
|
|
|
|
writer := NewEngineWriter(w)
|
|
kvCnt := rand.Intn(10) + 10000
|
|
kvSize := 0
|
|
kvs := make([]common.KvPair, kvCnt)
|
|
for i := range kvCnt {
|
|
key := []byte(uuid.New().String())
|
|
val := []byte("56789")
|
|
kvSize += len(key) + len(val)
|
|
kvs[i] = common.KvPair{
|
|
Key: key,
|
|
Val: val,
|
|
}
|
|
}
|
|
|
|
slices.SortFunc(kvs, func(i, j common.KvPair) int {
|
|
return bytes.Compare(i.Key, j.Key)
|
|
})
|
|
|
|
require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
|
|
_, err := writer.Close(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// 2. merge step
|
|
datas, stats, err := getKVAndStatFilesByScan(ctx, memStore, "test")
|
|
require.NoError(t, err)
|
|
|
|
dataGroup, _ := splitDataAndStatFiles(datas, stats)
|
|
|
|
lastStepDatas := make([]string, 0, 10)
|
|
lastStepStats := make([]string, 0, 10)
|
|
var startKey, endKey dbkv.Key
|
|
|
|
collector := &execute.TestCollector{}
|
|
|
|
onWriterClose := func(s *WriterSummary) {
|
|
for _, stat := range s.MultipleFilesStats {
|
|
for i := range stat.Filenames {
|
|
lastStepDatas = append(lastStepDatas, stat.Filenames[i][0])
|
|
lastStepStats = append(lastStepStats, stat.Filenames[i][1])
|
|
}
|
|
|
|
}
|
|
if len(startKey) == 0 && len(endKey) == 0 {
|
|
startKey = s.Min.Clone()
|
|
endKey = s.Max.Clone().Next()
|
|
}
|
|
startKey = BytesMin(startKey, s.Min.Clone())
|
|
endKey = BytesMax(endKey, s.Max.Clone().Next())
|
|
}
|
|
mergeMemSize := (rand.Intn(10) + 1) * 100
|
|
// use random mergeMemSize to test different memLimit of writer.
|
|
// reproduce one bug, see https://github.com/pingcap/tidb/issues/49590
|
|
bufSizeBak := DefaultReadBufferSize
|
|
memLimitBak := defaultOneWriterMemSizeLimit
|
|
t.Cleanup(func() {
|
|
DefaultReadBufferSize = bufSizeBak
|
|
defaultOneWriterMemSizeLimit = memLimitBak
|
|
})
|
|
DefaultReadBufferSize = 100
|
|
defaultOneWriterMemSizeLimit = uint64(mergeMemSize)
|
|
|
|
for _, group := range dataGroup {
|
|
wctx := workerpool.NewContext(ctx)
|
|
op := NewMergeOperator(
|
|
wctx,
|
|
memStore,
|
|
int64(5*size.MB),
|
|
"/test2",
|
|
mergeMemSize,
|
|
onWriterClose,
|
|
collector,
|
|
1,
|
|
true,
|
|
engineapi.OnDuplicateKeyIgnore,
|
|
)
|
|
|
|
require.NoError(t, MergeOverlappingFiles(
|
|
wctx,
|
|
group,
|
|
1,
|
|
op,
|
|
))
|
|
}
|
|
|
|
require.EqualValues(t, kvCnt, collector.Rows.Load())
|
|
require.EqualValues(t, kvSize, collector.Bytes.Load())
|
|
|
|
// 3. read and sort step
|
|
testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit)
|
|
}
|
|
|
|
func TestGlobalSortLocalWithMergeV2(t *testing.T) {
|
|
// 1. write data step
|
|
seed := time.Now().Unix()
|
|
rand.Seed(uint64(seed))
|
|
t.Logf("seed: %d", seed)
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
memSizeLimit := (rand.Intn(10) + 1) * 400
|
|
multiStats := make([]MultipleFilesStat, 0, 100)
|
|
randomSize := (rand.Intn(500) + 1) * 1000
|
|
|
|
failpoint.Enable("github.com/pingcap/tidb/pkg/lightning/backend/external/mockRangesGroupSize",
|
|
"return("+strconv.Itoa(randomSize)+")")
|
|
t.Cleanup(func() {
|
|
failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/backend/external/mockRangesGroupSize")
|
|
})
|
|
datas := make([]string, 0, 100)
|
|
stats := make([]string, 0, 100)
|
|
// prepare meta for merge step.
|
|
closeFn := func(s *WriterSummary) {
|
|
multiStats = append(multiStats, s.MultipleFilesStats...)
|
|
for _, stat := range s.MultipleFilesStats {
|
|
for i := range stat.Filenames {
|
|
datas = append(datas, stat.Filenames[i][0])
|
|
stats = append(stats, stat.Filenames[i][1])
|
|
}
|
|
}
|
|
}
|
|
|
|
w := NewWriterBuilder().
|
|
SetPropSizeDistance(100).
|
|
SetPropKeysDistance(2).
|
|
SetMemorySizeLimit(uint64(memSizeLimit)).
|
|
SetBlockSize(memSizeLimit).
|
|
SetOnCloseFunc(closeFn).
|
|
Build(memStore, "/test", "0")
|
|
|
|
writer := NewEngineWriter(w)
|
|
kvCnt := rand.Intn(10) + 10000
|
|
kvs := make([]common.KvPair, kvCnt)
|
|
for i := range kvCnt {
|
|
kvs[i] = common.KvPair{
|
|
Key: []byte(uuid.New().String()),
|
|
Val: []byte("56789"),
|
|
}
|
|
}
|
|
slices.SortFunc(kvs, func(i, j common.KvPair) int {
|
|
return bytes.Compare(i.Key, j.Key)
|
|
})
|
|
require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
|
|
_, err := writer.Close(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// 2. merge step
|
|
dataGroup, statGroup, startKeys, endKeys := splitDataStatAndKeys(datas, stats, multiStats)
|
|
lastStepDatas := make([]string, 0, 10)
|
|
lastStepStats := make([]string, 0, 10)
|
|
var startKey, endKey dbkv.Key
|
|
|
|
// prepare meta for last step.
|
|
closeFn1 := func(s *WriterSummary) {
|
|
for _, stat := range s.MultipleFilesStats {
|
|
for i := range stat.Filenames {
|
|
lastStepDatas = append(lastStepDatas, stat.Filenames[i][0])
|
|
lastStepStats = append(lastStepStats, stat.Filenames[i][1])
|
|
}
|
|
|
|
}
|
|
if len(startKey) == 0 && len(endKey) == 0 {
|
|
startKey = s.Min.Clone()
|
|
endKey = s.Max.Clone().Next()
|
|
}
|
|
startKey = BytesMin(startKey, s.Min.Clone())
|
|
endKey = BytesMax(endKey, s.Max.Clone().Next())
|
|
}
|
|
|
|
for i, group := range dataGroup {
|
|
require.NoError(t, MergeOverlappingFilesV2(
|
|
ctx,
|
|
mockOneMultiFileStat(group, statGroup[i]),
|
|
memStore,
|
|
startKeys[i],
|
|
endKeys[i],
|
|
int64(5*size.MB),
|
|
"/test2",
|
|
uuid.NewString(),
|
|
100,
|
|
8*1024,
|
|
100,
|
|
2,
|
|
closeFn1,
|
|
1,
|
|
true))
|
|
}
|
|
|
|
// 3. read and sort step
|
|
testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit)
|
|
}
|