618 lines
20 KiB
Go
618 lines
20 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"slices"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/util/size"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/exp/rand"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func TestGeneralProperties(t *testing.T) {
|
|
seed := time.Now().Unix()
|
|
rand.Seed(uint64(seed))
|
|
t.Logf("seed: %d", seed)
|
|
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
|
|
kvNum := rand.Intn(1000) + 100
|
|
keys := make([][]byte, kvNum)
|
|
values := make([][]byte, kvNum)
|
|
for i := range keys {
|
|
keyLen := rand.Intn(100) + 1
|
|
valueLen := rand.Intn(100) + 1
|
|
keys[i] = make([]byte, keyLen+2)
|
|
values[i] = make([]byte, valueLen+2)
|
|
rand.Read(keys[i][:keyLen])
|
|
rand.Read(values[i][:valueLen])
|
|
keys[i][keyLen] = byte(i / 255)
|
|
keys[i][keyLen+1] = byte(i % 255)
|
|
values[i][valueLen] = byte(i / 255)
|
|
values[i][valueLen+1] = byte(i % 255)
|
|
}
|
|
|
|
dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values)
|
|
require.NoError(t, err)
|
|
multiFileStat := mockOneMultiFileStat(dataFiles, statFiles)
|
|
splitter, err := NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 30, 1000, 1, 35, 1000,
|
|
)
|
|
require.NoError(t, err)
|
|
var lastEndKey []byte
|
|
notExhausted:
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
|
|
// endKey should be strictly greater than lastEndKey
|
|
if lastEndKey != nil && endKey != nil {
|
|
cmp := bytes.Compare(endKey, lastEndKey)
|
|
require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey)
|
|
}
|
|
|
|
// check dataFiles and statFiles
|
|
lenDataFiles := len(dataFiles)
|
|
lenStatFiles := len(statFiles)
|
|
require.Equal(t, lenDataFiles, lenStatFiles)
|
|
require.Greater(t, lenDataFiles, 0)
|
|
|
|
for _, toCheck := range []struct {
|
|
varName string
|
|
keys [][]byte
|
|
}{{"rangeJobKeys", rangeJobKeys}, {"regionSplitKeys", regionSplitKeys}} {
|
|
if len(toCheck.keys) > 0 {
|
|
// keys should be strictly increasing
|
|
for i := 1; i < len(toCheck.keys); i++ {
|
|
cmp := bytes.Compare(toCheck.keys[i], toCheck.keys[i-1])
|
|
require.Equal(t, 1, cmp, "%s: %v", toCheck.varName, toCheck.keys)
|
|
}
|
|
// first splitKeys should be strictly greater than lastEndKey
|
|
cmp := bytes.Compare(toCheck.keys[0], lastEndKey)
|
|
require.Equal(t, 1, cmp, "%s: %v, lastEndKey: %v", toCheck.varName, toCheck.keys, lastEndKey)
|
|
// last splitKeys should be strictly less than endKey
|
|
if endKey != nil {
|
|
cmp = bytes.Compare(toCheck.keys[len(toCheck.keys)-1], endKey)
|
|
require.Equal(t, -1, cmp, "%s: %v, endKey: %v", toCheck.varName, toCheck.keys, endKey)
|
|
}
|
|
}
|
|
}
|
|
|
|
lastEndKey = endKey
|
|
if endKey != nil {
|
|
goto notExhausted
|
|
}
|
|
require.NoError(t, splitter.Close())
|
|
}
|
|
|
|
func writeKVs(t *testing.T, writer *Writer, keys [][]byte, values [][]byte) {
|
|
ctx := context.Background()
|
|
for i := range keys {
|
|
err := writer.WriteRow(ctx, keys[i], values[i], nil)
|
|
require.NoError(t, err)
|
|
}
|
|
require.NoError(t, writer.Close(ctx))
|
|
}
|
|
|
|
func getKVAndStatFiles(sum *WriterSummary) (dataFiles []string, statsFiles []string) {
|
|
for _, ms := range sum.MultipleFilesStats {
|
|
for _, f := range ms.Filenames {
|
|
dataFiles = append(dataFiles, f[0])
|
|
statsFiles = append(statsFiles, f[1])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func removePartitionPrefix(t *testing.T, in []string) []string {
|
|
out := make([]string, 0, len(in))
|
|
for _, s := range in {
|
|
bs := []byte(s)
|
|
idx := bytes.IndexByte(bs, '/')
|
|
require.GreaterOrEqual(t, idx, 0)
|
|
require.True(t, isValidPartition(bs[:idx]))
|
|
// we include / after partition prefix in the out, as all tests have it.
|
|
out = append(out, s[idx:])
|
|
}
|
|
sort.Strings(out)
|
|
return out
|
|
}
|
|
|
|
func TestOnlyOneGroup(t *testing.T) {
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
subDir := "/mock-test"
|
|
|
|
var summary *WriterSummary
|
|
writer := NewWriterBuilder().
|
|
SetMemorySizeLimit(20).
|
|
SetPropSizeDistance(1).
|
|
SetPropKeysDistance(1).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(memStore, subDir, "5")
|
|
|
|
writeKVs(t, writer, [][]byte{{1}, {2}}, [][]byte{{1}, {2}})
|
|
dataFiles, statFiles := getKVAndStatFiles(summary)
|
|
require.Len(t, dataFiles, 1)
|
|
require.Len(t, statFiles, 1)
|
|
|
|
multiFileStat := mockOneMultiFileStat(dataFiles, statFiles)
|
|
|
|
splitter, err := NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 30, 1000, 10, int64(math.MaxInt64), int64(math.MaxInt64),
|
|
)
|
|
require.NoError(t, err)
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Len(t, dataFiles, 1)
|
|
require.Len(t, statFiles, 1)
|
|
require.Len(t, rangeJobKeys, 0)
|
|
require.Len(t, regionSplitKeys, 0)
|
|
require.NoError(t, splitter.Close())
|
|
|
|
splitter, err = NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 30, 1000, 1, int64(math.MaxInt64), int64(math.MaxInt64),
|
|
)
|
|
require.NoError(t, err)
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Len(t, dataFiles, 1)
|
|
require.Len(t, statFiles, 1)
|
|
require.Equal(t, [][]byte{{2}}, rangeJobKeys)
|
|
require.Len(t, regionSplitKeys, 0)
|
|
require.NoError(t, splitter.Close())
|
|
}
|
|
|
|
func TestSortedData(t *testing.T) {
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
kvNum := 100
|
|
|
|
keys := make([][]byte, kvNum)
|
|
values := make([][]byte, kvNum)
|
|
for i := range keys {
|
|
keys[i] = fmt.Appendf(nil, "key%03d", i)
|
|
values[i] = fmt.Appendf(nil, "val%03d", i)
|
|
}
|
|
|
|
dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values)
|
|
require.NoError(t, err)
|
|
// we just need to make sure there are multiple files.
|
|
require.Greater(t, len(dataFiles), 1)
|
|
avgKVPerFile := math.Ceil(float64(kvNum) / float64(len(dataFiles)))
|
|
rangesGroupKV := 30
|
|
groupFileNumUpperBound := int(math.Ceil(float64(rangesGroupKV-1)/avgKVPerFile)) + 1
|
|
|
|
multiFileStat := mockOneMultiFileStat(dataFiles, statFiles)
|
|
splitter, err := NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, int64(rangesGroupKV), 1000, 10, int64(math.MaxInt64), int64(math.MaxInt64),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
notExhausted:
|
|
endKey, dataFiles, statFiles, _, _, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.LessOrEqual(t, len(dataFiles), groupFileNumUpperBound)
|
|
require.LessOrEqual(t, len(statFiles), groupFileNumUpperBound)
|
|
if endKey != nil {
|
|
goto notExhausted
|
|
}
|
|
require.NoError(t, splitter.Close())
|
|
}
|
|
|
|
func TestRangeSplitterStrictCase(t *testing.T) {
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
subDir := "/mock-test"
|
|
|
|
var summary *WriterSummary
|
|
writer1 := NewWriterBuilder().
|
|
SetMemorySizeLimit(2*(lengthBytes*2+10)).
|
|
SetBlockSize(2*(lengthBytes*2+10)).
|
|
SetPropSizeDistance(1).
|
|
SetPropKeysDistance(1).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(memStore, subDir, "1")
|
|
keys1 := [][]byte{
|
|
[]byte("key01"), []byte("key11"), []byte("key21"),
|
|
}
|
|
values1 := [][]byte{
|
|
[]byte("val01"), []byte("val11"), []byte("val21"),
|
|
}
|
|
|
|
writeKVs(t, writer1, keys1, values1)
|
|
dataFiles1, statFiles1 := getKVAndStatFiles(summary)
|
|
require.Len(t, dataFiles1, 2)
|
|
require.Len(t, statFiles1, 2)
|
|
|
|
writer2 := NewWriterBuilder().
|
|
SetMemorySizeLimit(2*(lengthBytes*2+10)).
|
|
SetBlockSize(2*(lengthBytes*2+10)).
|
|
SetPropSizeDistance(1).
|
|
SetPropKeysDistance(1).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(memStore, subDir, "2")
|
|
keys2 := [][]byte{
|
|
[]byte("key02"), []byte("key12"), []byte("key22"),
|
|
}
|
|
values2 := [][]byte{
|
|
[]byte("val02"), []byte("val12"), []byte("val22"),
|
|
}
|
|
writeKVs(t, writer2, keys2, values2)
|
|
dataFiles2, statFiles2 := getKVAndStatFiles(summary)
|
|
require.Len(t, dataFiles2, 2)
|
|
require.Len(t, statFiles2, 2)
|
|
|
|
writer3 := NewWriterBuilder().
|
|
SetMemorySizeLimit(2*(lengthBytes*2+10)).
|
|
SetBlockSize(2*(lengthBytes*2+10)).
|
|
SetPropSizeDistance(1).
|
|
SetPropKeysDistance(1).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(memStore, subDir, "3")
|
|
keys3 := [][]byte{
|
|
[]byte("key03"), []byte("key13"), []byte("key23"),
|
|
}
|
|
values3 := [][]byte{
|
|
[]byte("val03"), []byte("val13"), []byte("val23"),
|
|
}
|
|
writeKVs(t, writer3, keys3, values3)
|
|
dataFiles3, statFiles3 := getKVAndStatFiles(summary)
|
|
require.Len(t, dataFiles3, 2)
|
|
require.Len(t, statFiles3, 2)
|
|
|
|
// "/mock-test/X/0" contains "key0X" and "key1X"
|
|
// "/mock-test/X/1" contains "key2X"
|
|
require.Equal(t, []string{
|
|
"/mock-test/3/0", "/mock-test/3/1",
|
|
}, removePartitionPrefix(t, dataFiles3))
|
|
|
|
dataFiles12 := append(append([]string{}, dataFiles1...), dataFiles2...)
|
|
statFiles12 := append(append([]string{}, statFiles1...), statFiles2...)
|
|
multi := mockOneMultiFileStat(dataFiles12, statFiles12)
|
|
multi[0].MinKey = []byte("key01")
|
|
multi[0].MaxKey = []byte("key21")
|
|
multi2 := mockOneMultiFileStat(dataFiles3, statFiles3)
|
|
multi2[0].MinKey = []byte("key02")
|
|
multi2[0].MaxKey = []byte("key22")
|
|
multiFileStat := []MultipleFilesStat{multi2[0], multi[0]}
|
|
// group keys = 2, region keys = 1
|
|
splitter, err := NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 2, 1000, 1, 1000, 1,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
// verify the multiFileStat is sorted
|
|
require.Equal(t, multi[0], multiFileStat[0])
|
|
require.Equal(t, multi2[0], multiFileStat[1])
|
|
|
|
// [key01, key03), split at key02
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, kv.Key("key03"), endKey)
|
|
require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0"}, removePartitionPrefix(t, dataFiles))
|
|
require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0"}, removePartitionPrefix(t, statFiles))
|
|
require.Equal(t, [][]byte{[]byte("key02")}, rangeJobKeys)
|
|
require.Equal(t, [][]byte{[]byte("key02")}, regionSplitKeys)
|
|
|
|
// [key03, key12), split at key11
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, kv.Key("key12"), endKey)
|
|
require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0", "/mock-test/3/0"}, removePartitionPrefix(t, dataFiles))
|
|
require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0", "/mock-test/3_stat/0"}, removePartitionPrefix(t, statFiles))
|
|
require.Equal(t, [][]byte{[]byte("key11")}, rangeJobKeys)
|
|
require.Equal(t, [][]byte{[]byte("key11")}, regionSplitKeys)
|
|
|
|
// [key12, key21), split at key13. the last key of "/mock-test/1/0" is "key11",
|
|
// so it's not used
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, kv.Key("key21"), endKey)
|
|
require.Equal(t, []string{"/mock-test/2/0", "/mock-test/3/0"}, removePartitionPrefix(t, dataFiles))
|
|
require.Equal(t, []string{"/mock-test/2_stat/0", "/mock-test/3_stat/0"}, removePartitionPrefix(t, statFiles))
|
|
require.Equal(t, [][]byte{[]byte("key13")}, rangeJobKeys)
|
|
require.Equal(t, [][]byte{[]byte("key13")}, regionSplitKeys)
|
|
|
|
// [key21, key23), split at key22.
|
|
// the last key of "/mock-test/2/0" is "key12", and the last key of "/mock-test/3/0" is "key13",
|
|
// so they are not used
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, kv.Key("key23"), endKey)
|
|
require.Equal(t, []string{"/mock-test/1/1", "/mock-test/2/1"}, removePartitionPrefix(t, dataFiles))
|
|
require.Equal(t, []string{"/mock-test/1_stat/1", "/mock-test/2_stat/1"}, removePartitionPrefix(t, statFiles))
|
|
require.Equal(t, [][]byte{[]byte("key22")}, rangeJobKeys)
|
|
require.Equal(t, [][]byte{[]byte("key22")}, regionSplitKeys)
|
|
|
|
// [key23, nil), no split key
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Equal(t, []string{"/mock-test/3/1"}, removePartitionPrefix(t, dataFiles))
|
|
require.Equal(t, []string{"/mock-test/3_stat/1"}, removePartitionPrefix(t, statFiles))
|
|
require.Len(t, rangeJobKeys, 0)
|
|
require.Len(t, regionSplitKeys, 0)
|
|
|
|
// read after drain all data
|
|
endKey, dataFiles, statFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Len(t, dataFiles, 0)
|
|
require.Len(t, statFiles, 0)
|
|
require.Len(t, rangeJobKeys, 0)
|
|
require.Len(t, regionSplitKeys, 0)
|
|
require.NoError(t, splitter.Close())
|
|
}
|
|
|
|
func TestExactlyKeyNum(t *testing.T) {
|
|
ctx := context.Background()
|
|
memStore := objstore.NewMemStorage()
|
|
kvNum := 3
|
|
|
|
keys := make([][]byte, kvNum)
|
|
values := make([][]byte, kvNum)
|
|
for i := range keys {
|
|
keys[i] = fmt.Appendf(nil, "key%03d", i)
|
|
values[i] = fmt.Appendf(nil, "value%03d", i)
|
|
}
|
|
|
|
subDir := "/mock-test"
|
|
|
|
var summary *WriterSummary
|
|
writer := NewWriterBuilder().
|
|
SetMemorySizeLimit(15).
|
|
SetPropSizeDistance(1).
|
|
SetPropKeysDistance(1).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(memStore, subDir, "5")
|
|
|
|
writeKVs(t, writer, keys, values)
|
|
dataFiles, statFiles := getKVAndStatFiles(summary)
|
|
multiFileStat := mockOneMultiFileStat(dataFiles, statFiles)
|
|
|
|
// maxRangeKeys = 3
|
|
splitter, err := NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 100, 1000, 3, 1000, 3,
|
|
)
|
|
require.NoError(t, err)
|
|
endKey, splitDataFiles, splitStatFiles, rangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Equal(t, dataFiles, splitDataFiles)
|
|
require.Equal(t, statFiles, splitStatFiles)
|
|
require.Len(t, rangeJobKeys, 0)
|
|
require.Len(t, regionSplitKeys, 0)
|
|
|
|
// rangesGroupKeys = 3
|
|
splitter, err = NewRangeSplitter(
|
|
ctx, multiFileStat, memStore, 1000, 3, 1000, 1, 1000, 2,
|
|
)
|
|
require.NoError(t, err)
|
|
endKey, splitDataFiles, splitStatFiles, rangeJobKeys, regionSplitKeys, err = splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Nil(t, endKey)
|
|
require.Equal(t, dataFiles, splitDataFiles)
|
|
require.Equal(t, statFiles, splitStatFiles)
|
|
require.Equal(t, [][]byte{[]byte("key001"), []byte("key002")}, rangeJobKeys)
|
|
require.Equal(t, [][]byte{[]byte("key002")}, regionSplitKeys)
|
|
}
|
|
|
|
func Test3KFilesRangeSplitter(t *testing.T) {
|
|
store := openTestingStorage(t)
|
|
ctx := context.Background()
|
|
|
|
// use HTTP pprof to debug
|
|
go func() {
|
|
http.ListenAndServe("localhost:6060", nil)
|
|
}()
|
|
|
|
// test the case that after one round merge step, we have 3000 stat files. In
|
|
// current merge step parameters, we will merge 4000 files of 256MB into 16
|
|
// files, so we directly write 4000*256MB/16 = 64GB data to onefile writer.
|
|
fileNum := 3000
|
|
statCh := make(chan []MultipleFilesStat, fileNum)
|
|
onClose := func(s *WriterSummary) {
|
|
statCh <- s.MultipleFilesStats
|
|
}
|
|
|
|
eg := errgroup.Group{}
|
|
eg.SetLimit(30)
|
|
for i := range fileNum {
|
|
eg.Go(func() error {
|
|
w := NewWriterBuilder().
|
|
SetMemorySizeLimit(DefaultMemSizeLimit).
|
|
SetBlockSize(32*units.MiB). // dataKVGroupBlockSize
|
|
SetPropKeysDistance(8*1024).
|
|
SetPropSizeDistance(size.MB).
|
|
SetOnCloseFunc(onClose).
|
|
BuildOneFile(store, "/mock-test", uuid.New().String())
|
|
w.InitPartSizeAndLogger(ctx, int64(5*size.MB))
|
|
// we don't need data files
|
|
err := w.dataWriter.Close(ctx)
|
|
require.NoError(t, err)
|
|
w.dataWriter = objstore.NoopWriter{}
|
|
|
|
kvSize := 20 * size.KB
|
|
keySize := size.KB
|
|
key := make([]byte, keySize)
|
|
key[keySize-1] = byte(i % 256)
|
|
key[keySize-2] = byte(i / 256)
|
|
minKey := slices.Clone(key)
|
|
var maxKey []byte
|
|
|
|
memSize := uint64(0)
|
|
for j := range int(64 * size.GB / kvSize) {
|
|
|
|
// copied from OneFileWriter.WriteRow
|
|
|
|
if memSize >= DefaultMemSizeLimit {
|
|
memSize = 0
|
|
w.kvStore.finish()
|
|
encodedStat := w.rc.encode()
|
|
_, err := w.statWriter.Write(ctx, encodedStat)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.rc.reset()
|
|
// the new prop should have the same offset with kvStore.
|
|
w.rc.currProp.offset = w.kvStore.offset
|
|
}
|
|
if len(w.rc.currProp.firstKey) == 0 {
|
|
w.rc.currProp.firstKey = key
|
|
}
|
|
w.rc.currProp.lastKey = key
|
|
|
|
memSize += kvSize
|
|
w.totalSize += kvSize
|
|
w.rc.currProp.size += kvSize - 2*lengthBytes
|
|
w.rc.currProp.keys++
|
|
|
|
if w.rc.currProp.size >= w.rc.propSizeDist ||
|
|
w.rc.currProp.keys >= w.rc.propKeysDist {
|
|
newProp := *w.rc.currProp
|
|
w.rc.props = append(w.rc.props, &newProp)
|
|
// reset currProp, and start to update this prop.
|
|
w.rc.currProp.firstKey = nil
|
|
w.rc.currProp.offset = memSize
|
|
w.rc.currProp.keys = 0
|
|
w.rc.currProp.size = 0
|
|
}
|
|
|
|
if j == int(64*size.GB/kvSize)-1 {
|
|
maxKey = slices.Clone(key)
|
|
}
|
|
|
|
// increase the key
|
|
|
|
for k := keySize - 3; k >= 0; k-- {
|
|
key[k]++
|
|
if key[k] != 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// copied from mergeOverlappingFilesInternal
|
|
var stat MultipleFilesStat
|
|
stat.Filenames = append(stat.Filenames,
|
|
[2]string{w.dataFile, w.statFile})
|
|
stat.build([]kv.Key{minKey}, []kv.Key{maxKey})
|
|
statCh <- []MultipleFilesStat{stat}
|
|
return w.Close(ctx)
|
|
})
|
|
}
|
|
|
|
require.NoError(t, eg.Wait())
|
|
|
|
multiStat := make([]MultipleFilesStat, 0, fileNum)
|
|
for range fileNum {
|
|
multiStat = append(multiStat, <-statCh...)
|
|
}
|
|
splitter, err := NewRangeSplitter(
|
|
ctx,
|
|
multiStat,
|
|
store,
|
|
int64(config.DefaultBatchSize),
|
|
int64(math.MaxInt64),
|
|
int64(config.SplitRegionSize),
|
|
int64(config.SplitRegionKeys),
|
|
int64(math.MaxInt64),
|
|
int64(math.MaxInt64),
|
|
)
|
|
require.NoError(t, err)
|
|
var lastEndKey []byte
|
|
for {
|
|
endKey, _, statFiles, _, _, err := splitter.SplitOneRangesGroup()
|
|
require.NoError(t, err)
|
|
require.Greater(t, len(statFiles), 0)
|
|
if endKey == nil {
|
|
break
|
|
}
|
|
if lastEndKey != nil {
|
|
cmp := bytes.Compare(endKey, lastEndKey)
|
|
require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey)
|
|
}
|
|
lastEndKey = slices.Clone(endKey)
|
|
}
|
|
err = splitter.Close()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestCalRangeSize(t *testing.T) {
|
|
var17 := 1.7
|
|
var35 := 3.5
|
|
commonUsedRegionSizeSettings := [][2]int64{
|
|
{96 * units.MiB, 960_000},
|
|
{256 * units.MiB, 2_560_000},
|
|
{512 * units.MiB, 5_120_000},
|
|
{units.GiB, 10_240_000},
|
|
}
|
|
cases := []struct {
|
|
memPerCore int64
|
|
rangeInfos [][3]int64 // [range-size, range-keys, sst-file-num]
|
|
}{
|
|
{memPerCore: int64(var17 * float64(units.GiB)), rangeInfos: [][3]int64{
|
|
{2 * 96 * units.MiB, 2 * 960_000, 1},
|
|
{256 * units.MiB, 2_560_000, 1},
|
|
{256*units.MiB + 1, 2_560_000, 2},
|
|
{256*units.MiB + 1, 2_560_000, 4},
|
|
}},
|
|
{memPerCore: int64(var35 * float64(units.GiB)), rangeInfos: [][3]int64{
|
|
{5 * 96 * units.MiB, 5 * 960_000, 1},
|
|
{512 * units.MiB, 5_120_000, 1},
|
|
{512 * units.MiB, 5_120_000, 1},
|
|
{512*units.MiB + 1, 5_120_000, 2},
|
|
}},
|
|
}
|
|
|
|
for i, c := range cases {
|
|
for j, rs := range commonUsedRegionSizeSettings {
|
|
t.Run(fmt.Sprintf("%d-%d", i, j), func(t *testing.T) {
|
|
regionSplitSize, regionSplitKeys := rs[0], rs[1]
|
|
rangeSize, rangeKeys := CalRangeSize(c.memPerCore, regionSplitSize, regionSplitKeys)
|
|
expectedRangeSize, expectedRangeKey, expectedFileNum := c.rangeInfos[j][0], c.rangeInfos[j][1], c.rangeInfos[j][2]
|
|
require.EqualValues(t, expectedRangeSize, rangeSize)
|
|
require.EqualValues(t, expectedRangeKey, rangeKeys)
|
|
fmt.Println(rangeSize, rangeKeys)
|
|
if expectedRangeSize >= regionSplitSize {
|
|
require.EqualValues(t, 1, expectedFileNum)
|
|
require.Zero(t, rangeSize%regionSplitSize)
|
|
} else {
|
|
require.EqualValues(t, expectedFileNum, int(math.Ceil(float64(regionSplitSize)/float64(rangeSize))))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|