diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 8b992ccbbf..a01c8e9664 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "file.go", "iter.go", "kv_reader.go", + "split.go", "stat_reader.go", "util.go", "writer.go", @@ -46,12 +47,13 @@ go_test( "engine_test.go", "file_test.go", "iter_test.go", + "split_test.go", "util_test.go", "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 22, + shard_count = 28, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/codec.go b/br/pkg/lightning/backend/external/codec.go index 97e7635321..7a75068d9b 100644 --- a/br/pkg/lightning/backend/external/codec.go +++ b/br/pkg/lightning/backend/external/codec.go @@ -18,11 +18,17 @@ import ( "encoding/binary" ) +// rangeProperty stores properties of a range: +// - key: the start key of the range. +// - offset: the start offset of the range in the file. +// - size: the size of the range. +// - keys: the number of keys in the range. type rangeProperty struct { - key []byte - offset uint64 - size uint64 - keys uint64 + firstKey []byte + lastKey []byte + offset uint64 + size uint64 + keys uint64 } // decodeMultiProps is only used for test. @@ -40,22 +46,31 @@ func decodeMultiProps(data []byte) []*rangeProperty { func decodeProp(data []byte) *rangeProperty { rp := &rangeProperty{} - keyLen := binary.BigEndian.Uint32(data[0:4]) - rp.key = data[4 : 4+keyLen] - rp.size = binary.BigEndian.Uint64(data[4+keyLen : 12+keyLen]) - rp.keys = binary.BigEndian.Uint64(data[12+keyLen : 20+keyLen]) - rp.offset = binary.BigEndian.Uint64(data[20+keyLen : 28+keyLen]) + n := 0 + keyLen := int(binary.BigEndian.Uint32(data[n : n+4])) + n += 4 + rp.firstKey = data[n : n+keyLen] + n += keyLen + keyLen = int(binary.BigEndian.Uint32(data[n : n+4])) + n += 4 + rp.lastKey = data[n : n+keyLen] + n += keyLen + rp.size = binary.BigEndian.Uint64(data[n : n+8]) + n += 8 + rp.keys = binary.BigEndian.Uint64(data[n : n+8]) + n += 8 + rp.offset = binary.BigEndian.Uint64(data[n : n+8]) return rp } -// keyLen + p.size + p.keys + p.offset -const propertyLengthExceptKey = 4 + 8 + 8 + 8 +// keyLen * 2 + p.size + p.keys + p.offset +const propertyLengthExceptKeys = 4*2 + 8 + 8 + 8 func encodeMultiProps(buf []byte, props []*rangeProperty) []byte { var propLen [4]byte for _, p := range props { binary.BigEndian.PutUint32(propLen[:], - uint32(propertyLengthExceptKey+len(p.key))) + uint32(propertyLengthExceptKeys+len(p.firstKey)+len(p.lastKey))) buf = append(buf, propLen[:4]...) buf = encodeProp(buf, p) } @@ -64,9 +79,12 @@ func encodeMultiProps(buf []byte, props []*rangeProperty) []byte { func encodeProp(buf []byte, r *rangeProperty) []byte { var b [8]byte - binary.BigEndian.PutUint32(b[:], uint32(len(r.key))) + binary.BigEndian.PutUint32(b[:], uint32(len(r.firstKey))) buf = append(buf, b[:4]...) - buf = append(buf, r.key...) + buf = append(buf, r.firstKey...) + binary.BigEndian.PutUint32(b[:], uint32(len(r.lastKey))) + buf = append(buf, b[:4]...) + buf = append(buf, r.lastKey...) binary.BigEndian.PutUint64(b[:], r.size) buf = append(buf, b[:]...) binary.BigEndian.PutUint64(b[:], r.keys) diff --git a/br/pkg/lightning/backend/external/codec_test.go b/br/pkg/lightning/backend/external/codec_test.go index 02163ac6a0..9c54146a78 100644 --- a/br/pkg/lightning/backend/external/codec_test.go +++ b/br/pkg/lightning/backend/external/codec_test.go @@ -23,10 +23,11 @@ import ( func TestRangePropertyCodec(t *testing.T) { prop := &rangeProperty{ - key: []byte("key"), - offset: 1, - size: 2, - keys: 3, + firstKey: []byte("key"), + lastKey: []byte("key2"), + offset: 1, + size: 2, + keys: 3, } buf := encodeProp(nil, prop) prop2 := decodeProp(buf) @@ -34,7 +35,8 @@ func TestRangePropertyCodec(t *testing.T) { p1, p2, p3 := &rangeProperty{}, &rangeProperty{}, &rangeProperty{} for i, p := range []*rangeProperty{p1, p2, p3} { - p.key = []byte(fmt.Sprintf("key%d", i)) + p.firstKey = []byte(fmt.Sprintf("key%d", i)) + p.lastKey = []byte(fmt.Sprintf("key%d9", i)) p.offset = uint64(10 * i) p.size = uint64(20 * i) p.keys = uint64(30 * i) @@ -43,3 +45,9 @@ func TestRangePropertyCodec(t *testing.T) { props := decodeMultiProps(buf) require.EqualValues(t, []*rangeProperty{p1, p2, p3}, props) } + +func TestPropertyLengthExceptKeys(t *testing.T) { + zero := &rangeProperty{} + bs := encodeProp(nil, zero) + require.EqualValues(t, propertyLengthExceptKeys, len(bs)) +} diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index 685f0495e4..1bed11e20d 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -57,36 +57,43 @@ func NewKeyValueStore( // appended to the rangePropertiesCollector with current status. // `key` must be in strictly ascending order for invocations of a KeyValueStore. func (s *KeyValueStore) AddKeyValue(key, value []byte) error { - kvLen := len(key) + len(value) + 16 - var b [8]byte + var ( + b [8]byte + kvLen = 0 + ) // data layout: keyLen + key + valueLen + value - _, err := s.dataWriter.Write( + n, err := s.dataWriter.Write( s.ctx, binary.BigEndian.AppendUint64(b[:0], uint64(len(key))), ) if err != nil { return err } - _, err = s.dataWriter.Write(s.ctx, key) + kvLen += n + n, err = s.dataWriter.Write(s.ctx, key) if err != nil { return err } - _, err = s.dataWriter.Write( + kvLen += n + n, err = s.dataWriter.Write( s.ctx, binary.BigEndian.AppendUint64(b[:0], uint64(len(value))), ) if err != nil { return err } - _, err = s.dataWriter.Write(s.ctx, value) + kvLen += n + n, err = s.dataWriter.Write(s.ctx, value) if err != nil { return err } + kvLen += n - if len(s.rc.currProp.key) == 0 { - s.rc.currProp.key = key + if len(s.rc.currProp.firstKey) == 0 { + s.rc.currProp.firstKey = key } + s.rc.currProp.lastKey = key s.offset += uint64(kvLen) s.rc.currProp.size += uint64(len(key) + len(value)) @@ -97,7 +104,7 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error { newProp := *s.rc.currProp s.rc.props = append(s.rc.props, &newProp) - s.rc.currProp.key = nil + s.rc.currProp.firstKey = nil s.rc.currProp.offset = s.offset s.rc.currProp.keys = 0 s.rc.currProp.size = 0 diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index 352a94682d..c3e2a79cd2 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -55,10 +55,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.NoError(t, err) require.Len(t, rc.props, 1) expected := &rangeProperty{ - key: k1, - offset: 0, - size: uint64(len(k1) + len(v1) + len(k2) + len(v2)), - keys: 2, + firstKey: k1, + lastKey: k2, + offset: 0, + size: uint64(len(k1) + len(v1) + len(k2) + len(v2)), + keys: 2, } require.Equal(t, expected, rc.props[0]) encoded = rc.encode() @@ -74,10 +75,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.NoError(t, err) kvStore.Close() expected = &rangeProperty{ - key: k3, - offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16), - size: uint64(len(k3) + len(v3)), - keys: 1, + firstKey: k3, + lastKey: k3, + offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16), + size: uint64(len(k3) + len(v3)), + keys: 1, } require.Len(t, rc.props, 2) require.Equal(t, expected, rc.props[1]) @@ -95,10 +97,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.NoError(t, err) require.Len(t, rc.props, 1) expected = &rangeProperty{ - key: k1, - offset: 0, - size: uint64(len(k1) + len(v1)), - keys: 1, + firstKey: k1, + lastKey: k1, + offset: 0, + size: uint64(len(k1) + len(v1)), + keys: 1, } require.Equal(t, expected, rc.props[0]) @@ -106,10 +109,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.NoError(t, err) require.Len(t, rc.props, 2) expected = &rangeProperty{ - key: k2, - offset: uint64(len(k1) + len(v1) + 16), - size: uint64(len(k2) + len(v2)), - keys: 1, + firstKey: k2, + lastKey: k2, + offset: uint64(len(k1) + len(v1) + 16), + size: uint64(len(k2) + len(v2)), + keys: 1, } require.Equal(t, expected, rc.props[1]) kvStore.Close() diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index 1a7874d4ba..ee4830ee6b 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -317,7 +317,7 @@ func (i *MergeKVIter) Close() error { } func (p rangeProperty) sortKey() []byte { - return p.key + return p.firstKey } type statReaderProxy struct { diff --git a/br/pkg/lightning/backend/external/split.go b/br/pkg/lightning/backend/external/split.go new file mode 100644 index 0000000000..f04fc7bdc6 --- /dev/null +++ b/br/pkg/lightning/backend/external/split.go @@ -0,0 +1,239 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "container/heap" + "context" + "slices" + + "github.com/pingcap/tidb/br/pkg/storage" +) + +type exhaustedHeapElem struct { + key []byte + dataFile string + statFile string +} + +type exhaustedHeap []exhaustedHeapElem + +func (h exhaustedHeap) Len() int { + return len(h) +} + +func (h exhaustedHeap) Less(i, j int) bool { + return bytes.Compare(h[i].key, h[j].key) < 0 +} + +func (h exhaustedHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *exhaustedHeap) Push(x interface{}) { + *h = append(*h, x.(exhaustedHeapElem)) +} + +func (h *exhaustedHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +// RangeSplitter is used to split key ranges of an external engine. It will +// return one group of ranges by invoking `SplitOneRangesGroup` once. +type RangeSplitter struct { + rangesGroupSize int64 + rangesGroupKeys int64 + rangeSize int64 + rangeKeys int64 + + propIter *MergePropIter + dataFiles []string + statFiles []string + + // filename -> index in dataFiles/statFiles + activeDataFiles map[string]int + activeStatFiles map[string]int + curGroupSize int64 + curGroupKeys int64 + curRangeSize int64 + curRangeKeys int64 + recordSplitKeyAfterNextProp bool + lastDataFile string + lastStatFile string + lastHeapSize int + lastRangeProperty *rangeProperty + willExhaustHeap exhaustedHeap + + rangeSplitKeysBuf [][]byte +} + +// NewRangeSplitter creates a new RangeSplitter. +// `dataFiles` and `statFiles` must be corresponding to each other. +// `rangesGroupSize` and `rangesGroupKeys` controls the total range group +// size of one `SplitOneRangesGroup` invocation, while `rangeSize` and +// `rangeKeys` controls the size of one range. +func NewRangeSplitter( + ctx context.Context, + dataFiles, statFiles []string, + externalStorage storage.ExternalStorage, + rangesGroupSize, rangesGroupKeys int64, + maxRangeSize, maxRangeKeys int64, +) (*RangeSplitter, error) { + propIter, err := NewMergePropIter(ctx, statFiles, externalStorage) + if err != nil { + return nil, err + } + + return &RangeSplitter{ + rangesGroupSize: rangesGroupSize, + rangesGroupKeys: rangesGroupKeys, + propIter: propIter, + dataFiles: dataFiles, + statFiles: statFiles, + activeDataFiles: make(map[string]int), + activeStatFiles: make(map[string]int), + + rangeSize: maxRangeSize, + rangeKeys: maxRangeKeys, + rangeSplitKeysBuf: make([][]byte, 0, 16), + }, nil +} + +// Close release the resources of RangeSplitter. +func (r *RangeSplitter) Close() error { + return r.propIter.Close() +} + +// SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the +// end key of the group, but it will be nil when the group is the last one. +// `dataFiles` and `statFiles` are all the files that have overlapping key ranges +// in this group. +// `rangeSplitKeys` are the internal split keys of the ranges in this group. +func (r *RangeSplitter) SplitOneRangesGroup() ( + endKeyOfGroup []byte, + dataFiles []string, + statFiles []string, + rangeSplitKeys [][]byte, + err error, +) { + var ( + exhaustedDataFiles, exhaustedStatFiles []string + retDataFiles, retStatFiles []string + returnAfterNextProp = false + ) + + for r.propIter.Next() { + if err = r.propIter.Error(); err != nil { + return nil, nil, nil, nil, err + } + prop := r.propIter.prop() + r.curGroupSize += int64(prop.size) + r.curRangeSize += int64(prop.size) + r.curGroupKeys += int64(prop.keys) + r.curRangeKeys += int64(prop.keys) + + // a tricky way to detect source file will exhaust + heapSize := r.propIter.iter.h.Len() + if heapSize < r.lastHeapSize { + heap.Push(&r.willExhaustHeap, exhaustedHeapElem{ + key: r.lastRangeProperty.lastKey, + dataFile: r.lastDataFile, + statFile: r.lastStatFile, + }) + } + + fileIdx := r.propIter.readerIndex() + dataFilePath := r.dataFiles[fileIdx] + statFilePath := r.statFiles[fileIdx] + r.activeDataFiles[dataFilePath] = fileIdx + r.activeStatFiles[statFilePath] = fileIdx + r.lastDataFile = dataFilePath + r.lastStatFile = statFilePath + r.lastHeapSize = heapSize + r.lastRangeProperty = prop + + for r.willExhaustHeap.Len() > 0 && + bytes.Compare(r.willExhaustHeap[0].key, prop.firstKey) < 0 { + exhaustedDataFiles = append(exhaustedDataFiles, r.willExhaustHeap[0].dataFile) + exhaustedStatFiles = append(exhaustedStatFiles, r.willExhaustHeap[0].statFile) + heap.Pop(&r.willExhaustHeap) + } + + if returnAfterNextProp { + for _, p := range exhaustedDataFiles { + delete(r.activeDataFiles, p) + } + exhaustedDataFiles = exhaustedDataFiles[:0] + for _, p := range exhaustedStatFiles { + delete(r.activeStatFiles, p) + } + exhaustedStatFiles = exhaustedStatFiles[:0] + return prop.firstKey, retDataFiles, retStatFiles, r.takeSplitKeys(), nil + } + if r.recordSplitKeyAfterNextProp { + r.rangeSplitKeysBuf = append(r.rangeSplitKeysBuf, slices.Clone(prop.firstKey)) + r.recordSplitKeyAfterNextProp = false + } + + if r.curRangeSize >= r.rangeSize || r.curRangeKeys >= r.rangeKeys { + r.curRangeSize = 0 + r.curRangeKeys = 0 + r.recordSplitKeyAfterNextProp = true + } + + if r.curGroupSize >= r.rangesGroupSize || r.curGroupKeys >= r.rangesGroupKeys { + retDataFiles, retStatFiles = r.cloneActiveFiles() + + r.curGroupSize = 0 + r.curGroupKeys = 0 + returnAfterNextProp = true + } + } + + retDataFiles, retStatFiles = r.cloneActiveFiles() + r.activeDataFiles = make(map[string]int) + r.activeStatFiles = make(map[string]int) + return nil, retDataFiles, retStatFiles, r.takeSplitKeys(), r.propIter.Error() +} + +func (r *RangeSplitter) cloneActiveFiles() (data []string, stat []string) { + dataFiles := make([]string, 0, len(r.activeDataFiles)) + for path := range r.activeDataFiles { + dataFiles = append(dataFiles, path) + } + slices.SortFunc(dataFiles, func(i, j string) int { + return r.activeDataFiles[i] - r.activeDataFiles[j] + }) + statFiles := make([]string, 0, len(r.activeStatFiles)) + for path := range r.activeStatFiles { + statFiles = append(statFiles, path) + } + slices.SortFunc(statFiles, func(i, j string) int { + return r.activeStatFiles[i] - r.activeStatFiles[j] + }) + return dataFiles, statFiles +} + +func (r *RangeSplitter) takeSplitKeys() [][]byte { + ret := make([][]byte, len(r.rangeSplitKeysBuf)) + copy(ret, r.rangeSplitKeysBuf) + r.rangeSplitKeysBuf = r.rangeSplitKeysBuf[:0] + return ret +} diff --git a/br/pkg/lightning/backend/external/split_test.go b/br/pkg/lightning/backend/external/split_test.go new file mode 100644 index 0000000000..85f671546c --- /dev/null +++ b/br/pkg/lightning/backend/external/split_test.go @@ -0,0 +1,336 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/kv" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +func TestGeneralProperties(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + + ctx := context.Background() + memStore := storage.NewMemStorage() + + kvNum := rand.Intn(1000) + 100 + keys := make([][]byte, kvNum) + values := make([][]byte, kvNum) + for i := range keys { + keyLen := rand.Intn(100) + 1 + valueLen := rand.Intn(100) + 1 + keys[i] = make([]byte, keyLen) + values[i] = make([]byte, valueLen) + rand.Read(keys[i]) + rand.Read(values[i]) + } + + dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) + require.NoError(t, err) + splitter, err := NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, + ) + var lastEndKey []byte +notExhausted: + endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + + // endKey should be strictly greater than lastEndKey + if lastEndKey != nil && endKey != nil { + cmp := bytes.Compare(endKey, lastEndKey) + require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey) + } + + // check dataFiles and statFiles + lenDataFiles := len(dataFiles) + lenStatFiles := len(statFiles) + require.Equal(t, lenDataFiles, lenStatFiles) + require.Greater(t, lenDataFiles, 0) + require.Greater(t, len(splitKeys), 0) + + // splitKeys should be strictly increasing + for i := 1; i < len(splitKeys); i++ { + cmp := bytes.Compare(splitKeys[i], splitKeys[i-1]) + require.Equal(t, 1, cmp, "splitKeys: %v", splitKeys) + } + // first splitKeys should be strictly greater than lastEndKey + cmp := bytes.Compare(splitKeys[0], lastEndKey) + require.Equal(t, 1, cmp, "splitKeys: %v, lastEndKey: %v", splitKeys, lastEndKey) + // last splitKeys should be strictly less than endKey + if endKey != nil { + cmp = bytes.Compare(splitKeys[len(splitKeys)-1], endKey) + require.Equal(t, -1, cmp, "splitKeys: %v, endKey: %v", splitKeys, endKey) + } + + lastEndKey = endKey + if endKey != nil { + goto notExhausted + } + require.NoError(t, splitter.Close()) +} + +func TestOnlyOneGroup(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + subDir := "/mock-test" + + writer := NewWriterBuilder(). + SetMemorySizeLimit(15). + SetPropSizeDistance(1). + SetPropKeysDistance(1). + Build(memStore, subDir, 5) + + dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, [][]byte{{1}, {2}}, [][]byte{{1}, {2}}) + require.NoError(t, err) + + splitter, err := NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 10, + ) + require.NoError(t, err) + endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Len(t, dataFiles, 1) + require.Len(t, statFiles, 1) + require.Len(t, splitKeys, 0) + require.NoError(t, splitter.Close()) + + splitter, err = NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, + ) + require.NoError(t, err) + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Len(t, dataFiles, 1) + require.Len(t, statFiles, 1) + require.Equal(t, [][]byte{{2}}, splitKeys) + require.NoError(t, splitter.Close()) +} + +func TestSortedData(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + kvNum := 100 + + keys := make([][]byte, kvNum) + values := make([][]byte, kvNum) + for i := range keys { + keys[i] = []byte(fmt.Sprintf("key%03d", i)) + values[i] = []byte(fmt.Sprintf("value%03d", i)) + } + + dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) + require.NoError(t, err) + // we just need to make sure there are multiple files. + require.Greater(t, len(dataFiles), 1) + avgKVPerFile := math.Ceil(float64(kvNum) / float64(len(dataFiles))) + rangesGroupKV := 30 + groupFileNumUpperBound := int(math.Ceil(float64(rangesGroupKV-1)/avgKVPerFile)) + 1 + + splitter, err := NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, int64(rangesGroupKV), 1000, 10, + ) + require.NoError(t, err) + +notExhausted: + endKey, dataFiles, statFiles, _, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.LessOrEqual(t, len(dataFiles), groupFileNumUpperBound) + require.LessOrEqual(t, len(statFiles), groupFileNumUpperBound) + if endKey != nil { + goto notExhausted + } + require.NoError(t, splitter.Close()) +} + +func TestRangeSplitterStrictCase(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + subDir := "/mock-test" + + writer1 := NewWriterBuilder(). + SetMemorySizeLimit(15). // slightly larger than len("key01") + len("value01") + SetPropSizeDistance(1). + SetPropKeysDistance(1). + Build(memStore, subDir, 1) + keys1 := [][]byte{ + []byte("key01"), []byte("key11"), []byte("key21"), + } + values1 := [][]byte{ + []byte("value01"), []byte("value11"), []byte("value21"), + } + dataFiles1, statFiles1, err := MockExternalEngineWithWriter(memStore, writer1, subDir, keys1, values1) + require.NoError(t, err) + require.Len(t, dataFiles1, 2) + require.Len(t, statFiles1, 2) + + writer2 := NewWriterBuilder(). + SetMemorySizeLimit(15). + SetPropSizeDistance(1). + SetPropKeysDistance(1). + Build(memStore, subDir, 2) + keys2 := [][]byte{ + []byte("key02"), []byte("key12"), []byte("key22"), + } + values2 := [][]byte{ + []byte("value02"), []byte("value12"), []byte("value22"), + } + dataFiles12, statFiles12, err := MockExternalEngineWithWriter(memStore, writer2, subDir, keys2, values2) + require.NoError(t, err) + require.Len(t, dataFiles12, 4) + require.Len(t, statFiles12, 4) + + writer3 := NewWriterBuilder(). + SetMemorySizeLimit(15). + SetPropSizeDistance(1). + SetPropKeysDistance(1). + Build(memStore, subDir, 3) + keys3 := [][]byte{ + []byte("key03"), []byte("key13"), []byte("key23"), + } + values3 := [][]byte{ + []byte("value03"), []byte("value13"), []byte("value23"), + } + dataFiles123, statFiles123, err := MockExternalEngineWithWriter(memStore, writer3, subDir, keys3, values3) + require.NoError(t, err) + require.Len(t, dataFiles123, 6) + require.Len(t, statFiles123, 6) + + // "/mock-test/X/0" contains "key0X" and "key1X" + // "/mock-test/X/1" contains "key2X" + require.Equal(t, []string{ + "/mock-test/1/0", "/mock-test/1/1", + "/mock-test/2/0", "/mock-test/2/1", + "/mock-test/3/0", "/mock-test/3/1", + }, dataFiles123) + + // group keys = 2, region keys = 1 + splitter, err := NewRangeSplitter( + ctx, dataFiles123, statFiles123, memStore, 1000, 2, 1000, 1, + ) + require.NoError(t, err) + + // [key01, key03), split at key02 + endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.EqualValues(t, kv.Key("key03"), endKey) + require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0"}, dataFiles) + require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0"}, statFiles) + require.Equal(t, [][]byte{[]byte("key02")}, splitKeys) + + // [key03, key12), split at key11 + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.EqualValues(t, kv.Key("key12"), endKey) + require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0", "/mock-test/3/0"}, dataFiles) + require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0", "/mock-test/3_stat/0"}, statFiles) + require.Equal(t, [][]byte{[]byte("key11")}, splitKeys) + + // [key12, key21), split at key13. the last key of "/mock-test/1/0" is "key11", + // so it's not used + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.EqualValues(t, kv.Key("key21"), endKey) + require.Equal(t, []string{"/mock-test/2/0", "/mock-test/3/0"}, dataFiles) + require.Equal(t, []string{"/mock-test/2_stat/0", "/mock-test/3_stat/0"}, statFiles) + require.Equal(t, [][]byte{[]byte("key13")}, splitKeys) + + // [key21, key23), split at key22. + // the last key of "/mock-test/2/0" is "key12", and the last key of "/mock-test/3/0" is "key13", + // so they are not used + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.EqualValues(t, kv.Key("key23"), endKey) + require.Equal(t, []string{"/mock-test/1/1", "/mock-test/2/1"}, dataFiles) + require.Equal(t, []string{"/mock-test/1_stat/1", "/mock-test/2_stat/1"}, statFiles) + require.Equal(t, [][]byte{[]byte("key22")}, splitKeys) + + // [key23, nil), no split key + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Equal(t, []string{"/mock-test/3/1"}, dataFiles) + require.Equal(t, []string{"/mock-test/3_stat/1"}, statFiles) + require.Len(t, splitKeys, 0) + + // read after drain all data + endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Len(t, dataFiles, 0) + require.Len(t, statFiles, 0) + require.Len(t, splitKeys, 0) + require.NoError(t, splitter.Close()) +} + +func TestExactlyKeyNum(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + kvNum := 3 + + keys := make([][]byte, kvNum) + values := make([][]byte, kvNum) + for i := range keys { + keys[i] = []byte(fmt.Sprintf("key%03d", i)) + values[i] = []byte(fmt.Sprintf("value%03d", i)) + } + + subDir := "/mock-test" + + writer := NewWriterBuilder(). + SetMemorySizeLimit(15). + SetPropSizeDistance(1). + SetPropKeysDistance(1). + Build(memStore, subDir, 5) + + dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, keys, values) + require.NoError(t, err) + + // maxRangeKeys = 3 + splitter, err := NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, 100, 1000, 3, + ) + require.NoError(t, err) + endKey, splitDataFiles, splitStatFiles, splitKeys, err := splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Equal(t, dataFiles, splitDataFiles) + require.Equal(t, statFiles, splitStatFiles) + require.Len(t, splitKeys, 0) + + // rangesGroupKeys = 3 + splitter, err = NewRangeSplitter( + ctx, dataFiles, statFiles, memStore, 1000, 3, 1000, 1, + ) + require.NoError(t, err) + endKey, splitDataFiles, splitStatFiles, splitKeys, err = splitter.SplitOneRangesGroup() + require.NoError(t, err) + require.Nil(t, endKey) + require.Equal(t, dataFiles, splitDataFiles) + require.Equal(t, statFiles, splitStatFiles) + require.Equal(t, [][]byte{[]byte("key001"), []byte("key002")}, splitKeys) +} diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index 187ffcd29e..ca3deb5d52 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -67,7 +67,7 @@ func seekPropsOffsets( moved := false for iter.Next() { p := iter.prop() - propKey := kv.Key(p.key) + propKey := kv.Key(p.firstKey) if propKey.Cmp(start) > 0 { if !moved { return nil, fmt.Errorf("start key %s is too small for stat files %v", @@ -164,12 +164,25 @@ func MockExternalEngine( keys [][]byte, values [][]byte, ) (dataFiles []string, statsFiles []string, err error) { - ctx := context.Background() + subDir := "/mock-test" writer := NewWriterBuilder(). SetMemorySizeLimit(128). SetPropSizeDistance(32). SetPropKeysDistance(4). Build(storage, "/mock-test", 0) + return MockExternalEngineWithWriter(storage, writer, subDir, keys, values) +} + +// MockExternalEngineWithWriter generates an external engine with the given +// writer, keys and values. +func MockExternalEngineWithWriter( + storage storage.ExternalStorage, + writer *Writer, + subDir string, + keys [][]byte, + values [][]byte, +) (dataFiles []string, statsFiles []string, err error) { + ctx := context.Background() kvs := make([]common.KvPair, len(keys)) for i := range keys { kvs[i].Key = keys[i] @@ -184,5 +197,5 @@ func MockExternalEngine( if err != nil { return nil, nil, err } - return GetAllFileNames(ctx, storage, "/mock-test") + return GetAllFileNames(ctx, storage, subDir) } diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index f110e433ed..379ccd7060 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -45,16 +45,16 @@ func TestSeekPropsOffsets(t *testing.T) { rc1 := &rangePropertiesCollector{ props: []*rangeProperty{ { - key: []byte("key1"), - offset: 10, + firstKey: []byte("key1"), + offset: 10, }, { - key: []byte("key3"), - offset: 30, + firstKey: []byte("key3"), + offset: 30, }, { - key: []byte("key5"), - offset: 50, + firstKey: []byte("key5"), + offset: 50, }, }, } @@ -69,12 +69,12 @@ func TestSeekPropsOffsets(t *testing.T) { rc2 := &rangePropertiesCollector{ props: []*rangeProperty{ { - key: []byte("key2"), - offset: 20, + firstKey: []byte("key2"), + offset: 20, }, { - key: []byte("key4"), - offset: 40, + firstKey: []byte("key4"), + offset: 40, }, }, }