// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package external import ( "bytes" "context" "fmt" "math" "net/http" _ "net/http/pprof" "slices" "sort" "testing" "time" "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/objstore" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" ) func TestGeneralProperties(t *testing.T) { seed := time.Now().Unix() rand.Seed(uint64(seed)) t.Logf("seed: %d", seed) ctx := context.Background() memStore := objstore.NewMemStorage() kvNum := rand.Intn(1000) + 100 keys := make([][]byte, kvNum) values := make([][]byte, kvNum) for i := range keys { keyLen := rand.Intn(100) + 1 valueLen := rand.Intn(100) + 1 keys[i] = make([]byte, keyLen+2) values[i] = make([]byte, valueLen+2) rand.Read(keys[i][:keyLen]) rand.Read(values[i][:valueLen]) keys[i][keyLen] = byte(i / 255) keys[i][keyLen+1] = byte(i % 255) values[i][valueLen] = byte(i / 255) values[i][valueLen+1] = byte(i % 255) } dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) require.NoError(t, err) multiFileStat := mockOneMultiFileStat(dataFiles, statFiles) splitter, err := NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 30, 1000, 1, 35, 1000, ) require.NoError(t, err) var lastEndKey []byte notExhausted: endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup() require.NoError(t, err) // endKey should be strictly greater than lastEndKey if lastEndKey != nil && endKey != nil { cmp := bytes.Compare(endKey, lastEndKey) require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey) } // check dataFiles and statFiles lenDataFiles := len(dataFiles) lenStatFiles := len(statFiles) require.Equal(t, lenDataFiles, lenStatFiles) require.Greater(t, lenDataFiles, 0) for _, toCheck := range []struct { varName string keys [][]byte }{{"rangeJobKeys", rangeJobKeys}, {"regionSplitKeys", regionSplitKeys}} { if len(toCheck.keys) > 0 { // keys should be strictly increasing for i := 1; i < len(toCheck.keys); i++ { cmp := bytes.Compare(toCheck.keys[i], toCheck.keys[i-1]) require.Equal(t, 1, cmp, "%s: %v", toCheck.varName, toCheck.keys) } // first splitKeys should be strictly greater than lastEndKey cmp := bytes.Compare(toCheck.keys[0], lastEndKey) require.Equal(t, 1, cmp, "%s: %v, lastEndKey: %v", toCheck.varName, toCheck.keys, lastEndKey) // last splitKeys should be strictly less than endKey if endKey != nil { cmp = bytes.Compare(toCheck.keys[len(toCheck.keys)-1], endKey) require.Equal(t, -1, cmp, "%s: %v, endKey: %v", toCheck.varName, toCheck.keys, endKey) } } } lastEndKey = endKey if endKey != nil { goto notExhausted } require.NoError(t, splitter.Close()) } func writeKVs(t *testing.T, writer *Writer, keys [][]byte, values [][]byte) { ctx := context.Background() for i := range keys { err := writer.WriteRow(ctx, keys[i], values[i], nil) require.NoError(t, err) } require.NoError(t, writer.Close(ctx)) } func getKVAndStatFiles(sum *WriterSummary) (dataFiles []string, statsFiles []string) { for _, ms := range sum.MultipleFilesStats { for _, f := range ms.Filenames { dataFiles = append(dataFiles, f[0]) statsFiles = append(statsFiles, f[1]) } } return } func removePartitionPrefix(t *testing.T, in []string) []string { out := make([]string, 0, len(in)) for _, s := range in { bs := []byte(s) idx := bytes.IndexByte(bs, '/') require.GreaterOrEqual(t, idx, 0) require.True(t, isValidPartition(bs[:idx])) // we include / after partition prefix in the out, as all tests have it. out = append(out, s[idx:]) } sort.Strings(out) return out } func TestOnlyOneGroup(t *testing.T) { ctx := context.Background() memStore := objstore.NewMemStorage() subDir := "/mock-test" var summary *WriterSummary writer := NewWriterBuilder(). SetMemorySizeLimit(20). SetPropSizeDistance(1). SetPropKeysDistance(1). SetOnCloseFunc(func(s *WriterSummary) { summary = s }). Build(memStore, subDir, "5") writeKVs(t, writer, [][]byte{{1}, {2}}, [][]byte{{1}, {2}}) dataFiles, statFiles := getKVAndStatFiles(summary) require.Len(t, dataFiles, 1) require.Len(t, statFiles, 1) multiFileStat := mockOneMultiFileStat(dataFiles, statFiles) splitter, err := NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 30, 1000, 10, int64(math.MaxInt64), int64(math.MaxInt64), ) require.NoError(t, err) endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Len(t, dataFiles, 1) require.Len(t, statFiles, 1) require.Len(t, rangeJobKeys, 0) require.Len(t, regionSplitKeys, 0) require.NoError(t, splitter.Close()) splitter, err = NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 30, 1000, 1, int64(math.MaxInt64), int64(math.MaxInt64), ) require.NoError(t, err) endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Len(t, dataFiles, 1) require.Len(t, statFiles, 1) require.Equal(t, [][]byte{{2}}, rangeJobKeys) require.Len(t, regionSplitKeys, 0) require.NoError(t, splitter.Close()) } func TestSortedData(t *testing.T) { ctx := context.Background() memStore := objstore.NewMemStorage() kvNum := 100 keys := make([][]byte, kvNum) values := make([][]byte, kvNum) for i := range keys { keys[i] = fmt.Appendf(nil, "key%03d", i) values[i] = fmt.Appendf(nil, "val%03d", i) } dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) require.NoError(t, err) // we just need to make sure there are multiple files. require.Greater(t, len(dataFiles), 1) avgKVPerFile := math.Ceil(float64(kvNum) / float64(len(dataFiles))) rangesGroupKV := 30 groupFileNumUpperBound := int(math.Ceil(float64(rangesGroupKV-1)/avgKVPerFile)) + 1 multiFileStat := mockOneMultiFileStat(dataFiles, statFiles) splitter, err := NewRangeSplitter( ctx, multiFileStat, memStore, 1000, int64(rangesGroupKV), 1000, 10, int64(math.MaxInt64), int64(math.MaxInt64), ) require.NoError(t, err) notExhausted: endKey, dataFiles, statFiles, _, _, err := splitter.SplitOneRangesGroup() require.NoError(t, err) require.LessOrEqual(t, len(dataFiles), groupFileNumUpperBound) require.LessOrEqual(t, len(statFiles), groupFileNumUpperBound) if endKey != nil { goto notExhausted } require.NoError(t, splitter.Close()) } func TestRangeSplitterStrictCase(t *testing.T) { ctx := context.Background() memStore := objstore.NewMemStorage() subDir := "/mock-test" var summary *WriterSummary writer1 := NewWriterBuilder(). SetMemorySizeLimit(2*(lengthBytes*2+10)). SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). SetOnCloseFunc(func(s *WriterSummary) { summary = s }). Build(memStore, subDir, "1") keys1 := [][]byte{ []byte("key01"), []byte("key11"), []byte("key21"), } values1 := [][]byte{ []byte("val01"), []byte("val11"), []byte("val21"), } writeKVs(t, writer1, keys1, values1) dataFiles1, statFiles1 := getKVAndStatFiles(summary) require.Len(t, dataFiles1, 2) require.Len(t, statFiles1, 2) writer2 := NewWriterBuilder(). SetMemorySizeLimit(2*(lengthBytes*2+10)). SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). SetOnCloseFunc(func(s *WriterSummary) { summary = s }). Build(memStore, subDir, "2") keys2 := [][]byte{ []byte("key02"), []byte("key12"), []byte("key22"), } values2 := [][]byte{ []byte("val02"), []byte("val12"), []byte("val22"), } writeKVs(t, writer2, keys2, values2) dataFiles2, statFiles2 := getKVAndStatFiles(summary) require.Len(t, dataFiles2, 2) require.Len(t, statFiles2, 2) writer3 := NewWriterBuilder(). SetMemorySizeLimit(2*(lengthBytes*2+10)). SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). SetOnCloseFunc(func(s *WriterSummary) { summary = s }). Build(memStore, subDir, "3") keys3 := [][]byte{ []byte("key03"), []byte("key13"), []byte("key23"), } values3 := [][]byte{ []byte("val03"), []byte("val13"), []byte("val23"), } writeKVs(t, writer3, keys3, values3) dataFiles3, statFiles3 := getKVAndStatFiles(summary) require.Len(t, dataFiles3, 2) require.Len(t, statFiles3, 2) // "/mock-test/X/0" contains "key0X" and "key1X" // "/mock-test/X/1" contains "key2X" require.Equal(t, []string{ "/mock-test/3/0", "/mock-test/3/1", }, removePartitionPrefix(t, dataFiles3)) dataFiles12 := append(append([]string{}, dataFiles1...), dataFiles2...) statFiles12 := append(append([]string{}, statFiles1...), statFiles2...) multi := mockOneMultiFileStat(dataFiles12, statFiles12) multi[0].MinKey = []byte("key01") multi[0].MaxKey = []byte("key21") multi2 := mockOneMultiFileStat(dataFiles3, statFiles3) multi2[0].MinKey = []byte("key02") multi2[0].MaxKey = []byte("key22") multiFileStat := []MultipleFilesStat{multi2[0], multi[0]} // group keys = 2, region keys = 1 splitter, err := NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 2, 1000, 1, 1000, 1, ) require.NoError(t, err) // verify the multiFileStat is sorted require.Equal(t, multi[0], multiFileStat[0]) require.Equal(t, multi2[0], multiFileStat[1]) // [key01, key03), split at key02 endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup() require.NoError(t, err) require.EqualValues(t, kv.Key("key03"), endKey) require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0"}, removePartitionPrefix(t, dataFiles)) require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0"}, removePartitionPrefix(t, statFiles)) require.Equal(t, [][]byte{[]byte("key02")}, rangeJobKeys) require.Equal(t, [][]byte{[]byte("key02")}, regionSplitKeys) // [key03, key12), split at key11 endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.EqualValues(t, kv.Key("key12"), endKey) require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0", "/mock-test/3/0"}, removePartitionPrefix(t, dataFiles)) require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0", "/mock-test/3_stat/0"}, removePartitionPrefix(t, statFiles)) require.Equal(t, [][]byte{[]byte("key11")}, rangeJobKeys) require.Equal(t, [][]byte{[]byte("key11")}, regionSplitKeys) // [key12, key21), split at key13. the last key of "/mock-test/1/0" is "key11", // so it's not used endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.EqualValues(t, kv.Key("key21"), endKey) require.Equal(t, []string{"/mock-test/2/0", "/mock-test/3/0"}, removePartitionPrefix(t, dataFiles)) require.Equal(t, []string{"/mock-test/2_stat/0", "/mock-test/3_stat/0"}, removePartitionPrefix(t, statFiles)) require.Equal(t, [][]byte{[]byte("key13")}, rangeJobKeys) require.Equal(t, [][]byte{[]byte("key13")}, regionSplitKeys) // [key21, key23), split at key22. // the last key of "/mock-test/2/0" is "key12", and the last key of "/mock-test/3/0" is "key13", // so they are not used endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.EqualValues(t, kv.Key("key23"), endKey) require.Equal(t, []string{"/mock-test/1/1", "/mock-test/2/1"}, removePartitionPrefix(t, dataFiles)) require.Equal(t, []string{"/mock-test/1_stat/1", "/mock-test/2_stat/1"}, removePartitionPrefix(t, statFiles)) require.Equal(t, [][]byte{[]byte("key22")}, rangeJobKeys) require.Equal(t, [][]byte{[]byte("key22")}, regionSplitKeys) // [key23, nil), no split key endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Equal(t, []string{"/mock-test/3/1"}, removePartitionPrefix(t, dataFiles)) require.Equal(t, []string{"/mock-test/3_stat/1"}, removePartitionPrefix(t, statFiles)) require.Len(t, rangeJobKeys, 0) require.Len(t, regionSplitKeys, 0) // read after drain all data endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Len(t, dataFiles, 0) require.Len(t, statFiles, 0) require.Len(t, rangeJobKeys, 0) require.Len(t, regionSplitKeys, 0) require.NoError(t, splitter.Close()) } func TestExactlyKeyNum(t *testing.T) { ctx := context.Background() memStore := objstore.NewMemStorage() kvNum := 3 keys := make([][]byte, kvNum) values := make([][]byte, kvNum) for i := range keys { keys[i] = fmt.Appendf(nil, "key%03d", i) values[i] = fmt.Appendf(nil, "value%03d", i) } subDir := "/mock-test" var summary *WriterSummary writer := NewWriterBuilder(). SetMemorySizeLimit(15). SetPropSizeDistance(1). SetPropKeysDistance(1). SetOnCloseFunc(func(s *WriterSummary) { summary = s }). Build(memStore, subDir, "5") writeKVs(t, writer, keys, values) dataFiles, statFiles := getKVAndStatFiles(summary) multiFileStat := mockOneMultiFileStat(dataFiles, statFiles) // maxRangeKeys = 3 splitter, err := NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 100, 1000, 3, 1000, 3, ) require.NoError(t, err) endKey, splitDataFiles, splitStatFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Equal(t, dataFiles, splitDataFiles) require.Equal(t, statFiles, splitStatFiles) require.Len(t, rangeJobKeys, 0) require.Len(t, regionSplitKeys, 0) // rangesGroupKeys = 3 splitter, err = NewRangeSplitter( ctx, multiFileStat, memStore, 1000, 3, 1000, 1, 1000, 2, ) require.NoError(t, err) endKey, splitDataFiles, splitStatFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup() require.NoError(t, err) require.Nil(t, endKey) require.Equal(t, dataFiles, splitDataFiles) require.Equal(t, statFiles, splitStatFiles) require.Equal(t, [][]byte{[]byte("key001"), []byte("key002")}, rangeJobKeys) require.Equal(t, [][]byte{[]byte("key002")}, regionSplitKeys) } func Test3KFilesRangeSplitter(t *testing.T) { store := openTestingStorage(t) ctx := context.Background() // use HTTP pprof to debug go func() { http.ListenAndServe("localhost:6060", nil) }() // test the case that after one round merge step, we have 3000 stat files. In // current merge step parameters, we will merge 4000 files of 256MB into 16 // files, so we directly write 4000*256MB/16 = 64GB data to onefile writer. fileNum := 3000 statCh := make(chan []MultipleFilesStat, fileNum) onClose := func(s *WriterSummary) { statCh <- s.MultipleFilesStats } eg := errgroup.Group{} eg.SetLimit(30) for i := range fileNum { eg.Go(func() error { w := NewWriterBuilder(). SetMemorySizeLimit(DefaultMemSizeLimit). SetBlockSize(32*units.MiB). // dataKVGroupBlockSize SetPropKeysDistance(8*1024). SetPropSizeDistance(size.MB). SetOnCloseFunc(onClose). BuildOneFile(store, "/mock-test", uuid.New().String()) w.InitPartSizeAndLogger(ctx, int64(5*size.MB)) // we don't need data files err := w.dataWriter.Close(ctx) require.NoError(t, err) w.dataWriter = objstore.NoopWriter{} kvSize := 20 * size.KB keySize := size.KB key := make([]byte, keySize) key[keySize-1] = byte(i % 256) key[keySize-2] = byte(i / 256) minKey := slices.Clone(key) var maxKey []byte memSize := uint64(0) for j := range int(64 * size.GB / kvSize) { // copied from OneFileWriter.WriteRow if memSize >= DefaultMemSizeLimit { memSize = 0 w.kvStore.finish() encodedStat := w.rc.encode() _, err := w.statWriter.Write(ctx, encodedStat) if err != nil { return err } w.rc.reset() // the new prop should have the same offset with kvStore. w.rc.currProp.offset = w.kvStore.offset } if len(w.rc.currProp.firstKey) == 0 { w.rc.currProp.firstKey = key } w.rc.currProp.lastKey = key memSize += kvSize w.totalSize += kvSize w.rc.currProp.size += kvSize - 2*lengthBytes w.rc.currProp.keys++ if w.rc.currProp.size >= w.rc.propSizeDist || w.rc.currProp.keys >= w.rc.propKeysDist { newProp := *w.rc.currProp w.rc.props = append(w.rc.props, &newProp) // reset currProp, and start to update this prop. w.rc.currProp.firstKey = nil w.rc.currProp.offset = memSize w.rc.currProp.keys = 0 w.rc.currProp.size = 0 } if j == int(64*size.GB/kvSize)-1 { maxKey = slices.Clone(key) } // increase the key for k := keySize - 3; k >= 0; k-- { key[k]++ if key[k] != 0 { break } } } // copied from mergeOverlappingFilesInternal var stat MultipleFilesStat stat.Filenames = append(stat.Filenames, [2]string{w.dataFile, w.statFile}) stat.build([]kv.Key{minKey}, []kv.Key{maxKey}) statCh <- []MultipleFilesStat{stat} return w.Close(ctx) }) } require.NoError(t, eg.Wait()) multiStat := make([]MultipleFilesStat, 0, fileNum) for range fileNum { multiStat = append(multiStat, <-statCh...) } splitter, err := NewRangeSplitter( ctx, multiStat, store, int64(config.DefaultBatchSize), int64(math.MaxInt64), int64(config.SplitRegionSize), int64(config.SplitRegionKeys), int64(math.MaxInt64), int64(math.MaxInt64), ) require.NoError(t, err) var lastEndKey []byte for { endKey, _, statFiles, _, _, err := splitter.SplitOneRangesGroup() require.NoError(t, err) require.Greater(t, len(statFiles), 0) if endKey == nil { break } if lastEndKey != nil { cmp := bytes.Compare(endKey, lastEndKey) require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey) } lastEndKey = slices.Clone(endKey) } err = splitter.Close() require.NoError(t, err) } func TestCalRangeSize(t *testing.T) { var17 := 1.7 var35 := 3.5 commonUsedRegionSizeSettings := [][2]int64{ {96 * units.MiB, 960_000}, {256 * units.MiB, 2_560_000}, {512 * units.MiB, 5_120_000}, {units.GiB, 10_240_000}, } cases := []struct { memPerCore int64 rangeInfos [][3]int64 // [range-size, range-keys, sst-file-num] }{ {memPerCore: int64(var17 * float64(units.GiB)), rangeInfos: [][3]int64{ {2 * 96 * units.MiB, 2 * 960_000, 1}, {256 * units.MiB, 2_560_000, 1}, {256*units.MiB + 1, 2_560_000, 2}, {256*units.MiB + 1, 2_560_000, 4}, }}, {memPerCore: int64(var35 * float64(units.GiB)), rangeInfos: [][3]int64{ {5 * 96 * units.MiB, 5 * 960_000, 1}, {512 * units.MiB, 5_120_000, 1}, {512 * units.MiB, 5_120_000, 1}, {512*units.MiB + 1, 5_120_000, 2}, }}, } for i, c := range cases { for j, rs := range commonUsedRegionSizeSettings { t.Run(fmt.Sprintf("%d-%d", i, j), func(t *testing.T) { regionSplitSize, regionSplitKeys := rs[0], rs[1] rangeSize, rangeKeys := CalRangeSize(c.memPerCore, regionSplitSize, regionSplitKeys) expectedRangeSize, expectedRangeKey, expectedFileNum := c.rangeInfos[j][0], c.rangeInfos[j][1], c.rangeInfos[j][2] require.EqualValues(t, expectedRangeSize, rangeSize) require.EqualValues(t, expectedRangeKey, rangeKeys) fmt.Println(rangeSize, rangeKeys) if expectedRangeSize >= regionSplitSize { require.EqualValues(t, 1, expectedFileNum) require.Zero(t, rangeSize%regionSplitSize) } else { require.EqualValues(t, expectedFileNum, int(math.Ceil(float64(regionSplitSize)/float64(rangeSize)))) } }) } } }