166 lines
4.3 KiB
Go
166 lines
4.3 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"math"
|
|
"testing"
|
|
|
|
"github.com/jfcg/sorty/v2"
|
|
dbkv "github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/membuf"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func mockOneMultiFileStat(data, stat []string) []MultipleFilesStat {
|
|
m := MultipleFilesStat{}
|
|
for i := range data {
|
|
m.Filenames = append(m.Filenames, [2]string{data[i], stat[i]})
|
|
}
|
|
return []MultipleFilesStat{m}
|
|
}
|
|
|
|
func testReadAndCompare(
|
|
ctx context.Context,
|
|
t *testing.T,
|
|
kvs []common.KvPair,
|
|
store storeapi.Storage,
|
|
datas []string,
|
|
stats []string,
|
|
startKey dbkv.Key,
|
|
memSizeLimit int) {
|
|
|
|
splitter, err := NewRangeSplitter(
|
|
ctx,
|
|
mockOneMultiFileStat(datas, stats),
|
|
store,
|
|
int64(memSizeLimit), // make the group small for testing
|
|
math.MaxInt64,
|
|
4*1024*1024*1024,
|
|
math.MaxInt64,
|
|
math.MaxInt64,
|
|
math.MaxInt64,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
bufPool := membuf.NewPool()
|
|
loaded := &memKVsAndBuffers{}
|
|
curStart := startKey.Clone()
|
|
kvIdx := 0
|
|
|
|
for {
|
|
endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, _, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
curEnd := dbkv.Key(endKeyOfGroup).Clone()
|
|
if len(endKeyOfGroup) == 0 {
|
|
curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next()
|
|
}
|
|
|
|
err = readAllData(
|
|
ctx,
|
|
store,
|
|
dataFilesOfGroup,
|
|
statFilesOfGroup,
|
|
curStart,
|
|
curEnd,
|
|
bufPool,
|
|
bufPool,
|
|
loaded,
|
|
)
|
|
require.NoError(t, err)
|
|
loaded.build(ctx)
|
|
|
|
// check kvs sorted
|
|
sorty.MaxGor = uint64(8)
|
|
sorty.Sort(len(loaded.kvs), func(i, k, r, s int) bool {
|
|
if bytes.Compare(loaded.kvs[i].Key, loaded.kvs[k].Key) < 0 { // strict comparator like < or >
|
|
if r != s {
|
|
loaded.kvs[r], loaded.kvs[s] = loaded.kvs[s], loaded.kvs[r]
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
for _, kv := range loaded.kvs {
|
|
require.EqualValues(t, kvs[kvIdx].Key, kv.Key)
|
|
require.EqualValues(t, kvs[kvIdx].Val, kv.Value)
|
|
kvIdx++
|
|
}
|
|
curStart = curEnd.Clone()
|
|
|
|
// release
|
|
loaded.kvs = nil
|
|
loaded.memKVBuffers = nil
|
|
|
|
if len(endKeyOfGroup) == 0 {
|
|
break
|
|
}
|
|
}
|
|
err = splitter.Close()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// split data and stat files into groups for merge step.
|
|
// like scheduler code for merge sort step in add index and import into.
|
|
func splitDataAndStatFiles(datas []string, stats []string) ([][]string, [][]string) {
|
|
dataGroup := make([][]string, 0, 10)
|
|
statGroup := make([][]string, 0, 10)
|
|
|
|
start := 0
|
|
step := 10
|
|
for start < len(datas) {
|
|
end := min(start+step, len(datas))
|
|
dataGroup = append(dataGroup, datas[start:end])
|
|
statGroup = append(statGroup, stats[start:end])
|
|
start = end
|
|
}
|
|
return dataGroup, statGroup
|
|
}
|
|
|
|
// split data&stat files, startKeys and endKeys into groups for new merge step.
|
|
func splitDataStatAndKeys(datas []string, stats []string, multiStats []MultipleFilesStat) ([][]string, [][]string, []dbkv.Key, []dbkv.Key) {
|
|
startKeys := make([]dbkv.Key, 0, 10)
|
|
endKeys := make([]dbkv.Key, 0, 10)
|
|
i := 0
|
|
for ; i < len(multiStats)-1; i += 2 {
|
|
startKey := BytesMin(multiStats[i].MinKey, multiStats[i+1].MinKey)
|
|
endKey := BytesMax(multiStats[i].MaxKey, multiStats[i+1].MaxKey)
|
|
endKey = dbkv.Key(endKey).Next().Clone()
|
|
startKeys = append(startKeys, startKey)
|
|
endKeys = append(endKeys, endKey)
|
|
}
|
|
if i == len(multiStats)-1 {
|
|
startKeys = append(startKeys, multiStats[i].MinKey.Clone())
|
|
endKeys = append(endKeys, multiStats[i].MaxKey.Next().Clone())
|
|
}
|
|
|
|
dataGroup := make([][]string, 0, 10)
|
|
statGroup := make([][]string, 0, 10)
|
|
|
|
start := 0
|
|
step := 2 * multiFileStatNum
|
|
for start < len(datas) {
|
|
end := min(start+step, len(datas))
|
|
dataGroup = append(dataGroup, datas[start:end])
|
|
statGroup = append(statGroup, stats[start:end])
|
|
start = end
|
|
}
|
|
return dataGroup, statGroup, startKeys, endKeys
|
|
}
|