lightning: port RangeSplitter (#46447)

ref pingcap/tidb#45719
This commit is contained in:
lance6716
2023-09-05 10:02:11 +08:00
committed by GitHub
parent 5706519375
commit 19e888f1fe
10 changed files with 686 additions and 59 deletions

View File

@ -9,6 +9,7 @@ go_library(
"file.go",
"iter.go",
"kv_reader.go",
"split.go",
"stat_reader.go",
"util.go",
"writer.go",
@ -46,12 +47,13 @@ go_test(
"engine_test.go",
"file_test.go",
"iter_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 22,
shard_count = 28,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",

View File

@ -18,11 +18,17 @@ import (
"encoding/binary"
)
// rangeProperty stores properties of a range:
// - key: the start key of the range.
// - offset: the start offset of the range in the file.
// - size: the size of the range.
// - keys: the number of keys in the range.
type rangeProperty struct {
key []byte
offset uint64
size uint64
keys uint64
firstKey []byte
lastKey []byte
offset uint64
size uint64
keys uint64
}
// decodeMultiProps is only used for test.
@ -40,22 +46,31 @@ func decodeMultiProps(data []byte) []*rangeProperty {
func decodeProp(data []byte) *rangeProperty {
rp := &rangeProperty{}
keyLen := binary.BigEndian.Uint32(data[0:4])
rp.key = data[4 : 4+keyLen]
rp.size = binary.BigEndian.Uint64(data[4+keyLen : 12+keyLen])
rp.keys = binary.BigEndian.Uint64(data[12+keyLen : 20+keyLen])
rp.offset = binary.BigEndian.Uint64(data[20+keyLen : 28+keyLen])
n := 0
keyLen := int(binary.BigEndian.Uint32(data[n : n+4]))
n += 4
rp.firstKey = data[n : n+keyLen]
n += keyLen
keyLen = int(binary.BigEndian.Uint32(data[n : n+4]))
n += 4
rp.lastKey = data[n : n+keyLen]
n += keyLen
rp.size = binary.BigEndian.Uint64(data[n : n+8])
n += 8
rp.keys = binary.BigEndian.Uint64(data[n : n+8])
n += 8
rp.offset = binary.BigEndian.Uint64(data[n : n+8])
return rp
}
// keyLen + p.size + p.keys + p.offset
const propertyLengthExceptKey = 4 + 8 + 8 + 8
// keyLen * 2 + p.size + p.keys + p.offset
const propertyLengthExceptKeys = 4*2 + 8 + 8 + 8
func encodeMultiProps(buf []byte, props []*rangeProperty) []byte {
var propLen [4]byte
for _, p := range props {
binary.BigEndian.PutUint32(propLen[:],
uint32(propertyLengthExceptKey+len(p.key)))
uint32(propertyLengthExceptKeys+len(p.firstKey)+len(p.lastKey)))
buf = append(buf, propLen[:4]...)
buf = encodeProp(buf, p)
}
@ -64,9 +79,12 @@ func encodeMultiProps(buf []byte, props []*rangeProperty) []byte {
func encodeProp(buf []byte, r *rangeProperty) []byte {
var b [8]byte
binary.BigEndian.PutUint32(b[:], uint32(len(r.key)))
binary.BigEndian.PutUint32(b[:], uint32(len(r.firstKey)))
buf = append(buf, b[:4]...)
buf = append(buf, r.key...)
buf = append(buf, r.firstKey...)
binary.BigEndian.PutUint32(b[:], uint32(len(r.lastKey)))
buf = append(buf, b[:4]...)
buf = append(buf, r.lastKey...)
binary.BigEndian.PutUint64(b[:], r.size)
buf = append(buf, b[:]...)
binary.BigEndian.PutUint64(b[:], r.keys)

View File

@ -23,10 +23,11 @@ import (
func TestRangePropertyCodec(t *testing.T) {
prop := &rangeProperty{
key: []byte("key"),
offset: 1,
size: 2,
keys: 3,
firstKey: []byte("key"),
lastKey: []byte("key2"),
offset: 1,
size: 2,
keys: 3,
}
buf := encodeProp(nil, prop)
prop2 := decodeProp(buf)
@ -34,7 +35,8 @@ func TestRangePropertyCodec(t *testing.T) {
p1, p2, p3 := &rangeProperty{}, &rangeProperty{}, &rangeProperty{}
for i, p := range []*rangeProperty{p1, p2, p3} {
p.key = []byte(fmt.Sprintf("key%d", i))
p.firstKey = []byte(fmt.Sprintf("key%d", i))
p.lastKey = []byte(fmt.Sprintf("key%d9", i))
p.offset = uint64(10 * i)
p.size = uint64(20 * i)
p.keys = uint64(30 * i)
@ -43,3 +45,9 @@ func TestRangePropertyCodec(t *testing.T) {
props := decodeMultiProps(buf)
require.EqualValues(t, []*rangeProperty{p1, p2, p3}, props)
}
func TestPropertyLengthExceptKeys(t *testing.T) {
zero := &rangeProperty{}
bs := encodeProp(nil, zero)
require.EqualValues(t, propertyLengthExceptKeys, len(bs))
}

View File

@ -57,36 +57,43 @@ func NewKeyValueStore(
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
kvLen := len(key) + len(value) + 16
var b [8]byte
var (
b [8]byte
kvLen = 0
)
// data layout: keyLen + key + valueLen + value
_, err := s.dataWriter.Write(
n, err := s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(key))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, key)
kvLen += n
n, err = s.dataWriter.Write(s.ctx, key)
if err != nil {
return err
}
_, err = s.dataWriter.Write(
kvLen += n
n, err = s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(value))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, value)
kvLen += n
n, err = s.dataWriter.Write(s.ctx, value)
if err != nil {
return err
}
kvLen += n
if len(s.rc.currProp.key) == 0 {
s.rc.currProp.key = key
if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key
s.offset += uint64(kvLen)
s.rc.currProp.size += uint64(len(key) + len(value))
@ -97,7 +104,7 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
newProp := *s.rc.currProp
s.rc.props = append(s.rc.props, &newProp)
s.rc.currProp.key = nil
s.rc.currProp.firstKey = nil
s.rc.currProp.offset = s.offset
s.rc.currProp.keys = 0
s.rc.currProp.size = 0

View File

@ -55,10 +55,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
firstKey: k1,
lastKey: k2,
offset: 0,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
}
require.Equal(t, expected, rc.props[0])
encoded = rc.encode()
@ -74,10 +75,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
kvStore.Close()
expected = &rangeProperty{
key: k3,
offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16),
size: uint64(len(k3) + len(v3)),
keys: 1,
firstKey: k3,
lastKey: k3,
offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16),
size: uint64(len(k3) + len(v3)),
keys: 1,
}
require.Len(t, rc.props, 2)
require.Equal(t, expected, rc.props[1])
@ -95,10 +97,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1)),
keys: 1,
firstKey: k1,
lastKey: k1,
offset: 0,
size: uint64(len(k1) + len(v1)),
keys: 1,
}
require.Equal(t, expected, rc.props[0])
@ -106,10 +109,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
key: k2,
offset: uint64(len(k1) + len(v1) + 16),
size: uint64(len(k2) + len(v2)),
keys: 1,
firstKey: k2,
lastKey: k2,
offset: uint64(len(k1) + len(v1) + 16),
size: uint64(len(k2) + len(v2)),
keys: 1,
}
require.Equal(t, expected, rc.props[1])
kvStore.Close()

View File

@ -317,7 +317,7 @@ func (i *MergeKVIter) Close() error {
}
func (p rangeProperty) sortKey() []byte {
return p.key
return p.firstKey
}
type statReaderProxy struct {

View File

@ -0,0 +1,239 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"bytes"
"container/heap"
"context"
"slices"
"github.com/pingcap/tidb/br/pkg/storage"
)
type exhaustedHeapElem struct {
key []byte
dataFile string
statFile string
}
type exhaustedHeap []exhaustedHeapElem
func (h exhaustedHeap) Len() int {
return len(h)
}
func (h exhaustedHeap) Less(i, j int) bool {
return bytes.Compare(h[i].key, h[j].key) < 0
}
func (h exhaustedHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *exhaustedHeap) Push(x interface{}) {
*h = append(*h, x.(exhaustedHeapElem))
}
func (h *exhaustedHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// RangeSplitter is used to split key ranges of an external engine. It will
// return one group of ranges by invoking `SplitOneRangesGroup` once.
type RangeSplitter struct {
rangesGroupSize int64
rangesGroupKeys int64
rangeSize int64
rangeKeys int64
propIter *MergePropIter
dataFiles []string
statFiles []string
// filename -> index in dataFiles/statFiles
activeDataFiles map[string]int
activeStatFiles map[string]int
curGroupSize int64
curGroupKeys int64
curRangeSize int64
curRangeKeys int64
recordSplitKeyAfterNextProp bool
lastDataFile string
lastStatFile string
lastHeapSize int
lastRangeProperty *rangeProperty
willExhaustHeap exhaustedHeap
rangeSplitKeysBuf [][]byte
}
// NewRangeSplitter creates a new RangeSplitter.
// `dataFiles` and `statFiles` must be corresponding to each other.
// `rangesGroupSize` and `rangesGroupKeys` controls the total range group
// size of one `SplitOneRangesGroup` invocation, while `rangeSize` and
// `rangeKeys` controls the size of one range.
func NewRangeSplitter(
ctx context.Context,
dataFiles, statFiles []string,
externalStorage storage.ExternalStorage,
rangesGroupSize, rangesGroupKeys int64,
maxRangeSize, maxRangeKeys int64,
) (*RangeSplitter, error) {
propIter, err := NewMergePropIter(ctx, statFiles, externalStorage)
if err != nil {
return nil, err
}
return &RangeSplitter{
rangesGroupSize: rangesGroupSize,
rangesGroupKeys: rangesGroupKeys,
propIter: propIter,
dataFiles: dataFiles,
statFiles: statFiles,
activeDataFiles: make(map[string]int),
activeStatFiles: make(map[string]int),
rangeSize: maxRangeSize,
rangeKeys: maxRangeKeys,
rangeSplitKeysBuf: make([][]byte, 0, 16),
}, nil
}
// Close release the resources of RangeSplitter.
func (r *RangeSplitter) Close() error {
return r.propIter.Close()
}
// SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the
// end key of the group, but it will be nil when the group is the last one.
// `dataFiles` and `statFiles` are all the files that have overlapping key ranges
// in this group.
// `rangeSplitKeys` are the internal split keys of the ranges in this group.
func (r *RangeSplitter) SplitOneRangesGroup() (
endKeyOfGroup []byte,
dataFiles []string,
statFiles []string,
rangeSplitKeys [][]byte,
err error,
) {
var (
exhaustedDataFiles, exhaustedStatFiles []string
retDataFiles, retStatFiles []string
returnAfterNextProp = false
)
for r.propIter.Next() {
if err = r.propIter.Error(); err != nil {
return nil, nil, nil, nil, err
}
prop := r.propIter.prop()
r.curGroupSize += int64(prop.size)
r.curRangeSize += int64(prop.size)
r.curGroupKeys += int64(prop.keys)
r.curRangeKeys += int64(prop.keys)
// a tricky way to detect source file will exhaust
heapSize := r.propIter.iter.h.Len()
if heapSize < r.lastHeapSize {
heap.Push(&r.willExhaustHeap, exhaustedHeapElem{
key: r.lastRangeProperty.lastKey,
dataFile: r.lastDataFile,
statFile: r.lastStatFile,
})
}
fileIdx := r.propIter.readerIndex()
dataFilePath := r.dataFiles[fileIdx]
statFilePath := r.statFiles[fileIdx]
r.activeDataFiles[dataFilePath] = fileIdx
r.activeStatFiles[statFilePath] = fileIdx
r.lastDataFile = dataFilePath
r.lastStatFile = statFilePath
r.lastHeapSize = heapSize
r.lastRangeProperty = prop
for r.willExhaustHeap.Len() > 0 &&
bytes.Compare(r.willExhaustHeap[0].key, prop.firstKey) < 0 {
exhaustedDataFiles = append(exhaustedDataFiles, r.willExhaustHeap[0].dataFile)
exhaustedStatFiles = append(exhaustedStatFiles, r.willExhaustHeap[0].statFile)
heap.Pop(&r.willExhaustHeap)
}
if returnAfterNextProp {
for _, p := range exhaustedDataFiles {
delete(r.activeDataFiles, p)
}
exhaustedDataFiles = exhaustedDataFiles[:0]
for _, p := range exhaustedStatFiles {
delete(r.activeStatFiles, p)
}
exhaustedStatFiles = exhaustedStatFiles[:0]
return prop.firstKey, retDataFiles, retStatFiles, r.takeSplitKeys(), nil
}
if r.recordSplitKeyAfterNextProp {
r.rangeSplitKeysBuf = append(r.rangeSplitKeysBuf, slices.Clone(prop.firstKey))
r.recordSplitKeyAfterNextProp = false
}
if r.curRangeSize >= r.rangeSize || r.curRangeKeys >= r.rangeKeys {
r.curRangeSize = 0
r.curRangeKeys = 0
r.recordSplitKeyAfterNextProp = true
}
if r.curGroupSize >= r.rangesGroupSize || r.curGroupKeys >= r.rangesGroupKeys {
retDataFiles, retStatFiles = r.cloneActiveFiles()
r.curGroupSize = 0
r.curGroupKeys = 0
returnAfterNextProp = true
}
}
retDataFiles, retStatFiles = r.cloneActiveFiles()
r.activeDataFiles = make(map[string]int)
r.activeStatFiles = make(map[string]int)
return nil, retDataFiles, retStatFiles, r.takeSplitKeys(), r.propIter.Error()
}
func (r *RangeSplitter) cloneActiveFiles() (data []string, stat []string) {
dataFiles := make([]string, 0, len(r.activeDataFiles))
for path := range r.activeDataFiles {
dataFiles = append(dataFiles, path)
}
slices.SortFunc(dataFiles, func(i, j string) int {
return r.activeDataFiles[i] - r.activeDataFiles[j]
})
statFiles := make([]string, 0, len(r.activeStatFiles))
for path := range r.activeStatFiles {
statFiles = append(statFiles, path)
}
slices.SortFunc(statFiles, func(i, j string) int {
return r.activeStatFiles[i] - r.activeStatFiles[j]
})
return dataFiles, statFiles
}
func (r *RangeSplitter) takeSplitKeys() [][]byte {
ret := make([][]byte, len(r.rangeSplitKeysBuf))
copy(ret, r.rangeSplitKeysBuf)
r.rangeSplitKeysBuf = r.rangeSplitKeysBuf[:0]
return ret
}

View File

@ -0,0 +1,336 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"bytes"
"context"
"fmt"
"math"
"testing"
"time"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
func TestGeneralProperties(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
ctx := context.Background()
memStore := storage.NewMemStorage()
kvNum := rand.Intn(1000) + 100
keys := make([][]byte, kvNum)
values := make([][]byte, kvNum)
for i := range keys {
keyLen := rand.Intn(100) + 1
valueLen := rand.Intn(100) + 1
keys[i] = make([]byte, keyLen)
values[i] = make([]byte, valueLen)
rand.Read(keys[i])
rand.Read(values[i])
}
dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values)
require.NoError(t, err)
splitter, err := NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1,
)
var lastEndKey []byte
notExhausted:
endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
// endKey should be strictly greater than lastEndKey
if lastEndKey != nil && endKey != nil {
cmp := bytes.Compare(endKey, lastEndKey)
require.Equal(t, 1, cmp, "endKey: %v, lastEndKey: %v", endKey, lastEndKey)
}
// check dataFiles and statFiles
lenDataFiles := len(dataFiles)
lenStatFiles := len(statFiles)
require.Equal(t, lenDataFiles, lenStatFiles)
require.Greater(t, lenDataFiles, 0)
require.Greater(t, len(splitKeys), 0)
// splitKeys should be strictly increasing
for i := 1; i < len(splitKeys); i++ {
cmp := bytes.Compare(splitKeys[i], splitKeys[i-1])
require.Equal(t, 1, cmp, "splitKeys: %v", splitKeys)
}
// first splitKeys should be strictly greater than lastEndKey
cmp := bytes.Compare(splitKeys[0], lastEndKey)
require.Equal(t, 1, cmp, "splitKeys: %v, lastEndKey: %v", splitKeys, lastEndKey)
// last splitKeys should be strictly less than endKey
if endKey != nil {
cmp = bytes.Compare(splitKeys[len(splitKeys)-1], endKey)
require.Equal(t, -1, cmp, "splitKeys: %v, endKey: %v", splitKeys, endKey)
}
lastEndKey = endKey
if endKey != nil {
goto notExhausted
}
require.NoError(t, splitter.Close())
}
func TestOnlyOneGroup(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
subDir := "/mock-test"
writer := NewWriterBuilder().
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 5)
dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, [][]byte{{1}, {2}}, [][]byte{{1}, {2}})
require.NoError(t, err)
splitter, err := NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 10,
)
require.NoError(t, err)
endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Len(t, dataFiles, 1)
require.Len(t, statFiles, 1)
require.Len(t, splitKeys, 0)
require.NoError(t, splitter.Close())
splitter, err = NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1,
)
require.NoError(t, err)
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Len(t, dataFiles, 1)
require.Len(t, statFiles, 1)
require.Equal(t, [][]byte{{2}}, splitKeys)
require.NoError(t, splitter.Close())
}
func TestSortedData(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
kvNum := 100
keys := make([][]byte, kvNum)
values := make([][]byte, kvNum)
for i := range keys {
keys[i] = []byte(fmt.Sprintf("key%03d", i))
values[i] = []byte(fmt.Sprintf("value%03d", i))
}
dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values)
require.NoError(t, err)
// we just need to make sure there are multiple files.
require.Greater(t, len(dataFiles), 1)
avgKVPerFile := math.Ceil(float64(kvNum) / float64(len(dataFiles)))
rangesGroupKV := 30
groupFileNumUpperBound := int(math.Ceil(float64(rangesGroupKV-1)/avgKVPerFile)) + 1
splitter, err := NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, int64(rangesGroupKV), 1000, 10,
)
require.NoError(t, err)
notExhausted:
endKey, dataFiles, statFiles, _, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.LessOrEqual(t, len(dataFiles), groupFileNumUpperBound)
require.LessOrEqual(t, len(statFiles), groupFileNumUpperBound)
if endKey != nil {
goto notExhausted
}
require.NoError(t, splitter.Close())
}
func TestRangeSplitterStrictCase(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
subDir := "/mock-test"
writer1 := NewWriterBuilder().
SetMemorySizeLimit(15). // slightly larger than len("key01") + len("value01")
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 1)
keys1 := [][]byte{
[]byte("key01"), []byte("key11"), []byte("key21"),
}
values1 := [][]byte{
[]byte("value01"), []byte("value11"), []byte("value21"),
}
dataFiles1, statFiles1, err := MockExternalEngineWithWriter(memStore, writer1, subDir, keys1, values1)
require.NoError(t, err)
require.Len(t, dataFiles1, 2)
require.Len(t, statFiles1, 2)
writer2 := NewWriterBuilder().
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 2)
keys2 := [][]byte{
[]byte("key02"), []byte("key12"), []byte("key22"),
}
values2 := [][]byte{
[]byte("value02"), []byte("value12"), []byte("value22"),
}
dataFiles12, statFiles12, err := MockExternalEngineWithWriter(memStore, writer2, subDir, keys2, values2)
require.NoError(t, err)
require.Len(t, dataFiles12, 4)
require.Len(t, statFiles12, 4)
writer3 := NewWriterBuilder().
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 3)
keys3 := [][]byte{
[]byte("key03"), []byte("key13"), []byte("key23"),
}
values3 := [][]byte{
[]byte("value03"), []byte("value13"), []byte("value23"),
}
dataFiles123, statFiles123, err := MockExternalEngineWithWriter(memStore, writer3, subDir, keys3, values3)
require.NoError(t, err)
require.Len(t, dataFiles123, 6)
require.Len(t, statFiles123, 6)
// "/mock-test/X/0" contains "key0X" and "key1X"
// "/mock-test/X/1" contains "key2X"
require.Equal(t, []string{
"/mock-test/1/0", "/mock-test/1/1",
"/mock-test/2/0", "/mock-test/2/1",
"/mock-test/3/0", "/mock-test/3/1",
}, dataFiles123)
// group keys = 2, region keys = 1
splitter, err := NewRangeSplitter(
ctx, dataFiles123, statFiles123, memStore, 1000, 2, 1000, 1,
)
require.NoError(t, err)
// [key01, key03), split at key02
endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.EqualValues(t, kv.Key("key03"), endKey)
require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0"}, dataFiles)
require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0"}, statFiles)
require.Equal(t, [][]byte{[]byte("key02")}, splitKeys)
// [key03, key12), split at key11
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.EqualValues(t, kv.Key("key12"), endKey)
require.Equal(t, []string{"/mock-test/1/0", "/mock-test/2/0", "/mock-test/3/0"}, dataFiles)
require.Equal(t, []string{"/mock-test/1_stat/0", "/mock-test/2_stat/0", "/mock-test/3_stat/0"}, statFiles)
require.Equal(t, [][]byte{[]byte("key11")}, splitKeys)
// [key12, key21), split at key13. the last key of "/mock-test/1/0" is "key11",
// so it's not used
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.EqualValues(t, kv.Key("key21"), endKey)
require.Equal(t, []string{"/mock-test/2/0", "/mock-test/3/0"}, dataFiles)
require.Equal(t, []string{"/mock-test/2_stat/0", "/mock-test/3_stat/0"}, statFiles)
require.Equal(t, [][]byte{[]byte("key13")}, splitKeys)
// [key21, key23), split at key22.
// the last key of "/mock-test/2/0" is "key12", and the last key of "/mock-test/3/0" is "key13",
// so they are not used
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.EqualValues(t, kv.Key("key23"), endKey)
require.Equal(t, []string{"/mock-test/1/1", "/mock-test/2/1"}, dataFiles)
require.Equal(t, []string{"/mock-test/1_stat/1", "/mock-test/2_stat/1"}, statFiles)
require.Equal(t, [][]byte{[]byte("key22")}, splitKeys)
// [key23, nil), no split key
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Equal(t, []string{"/mock-test/3/1"}, dataFiles)
require.Equal(t, []string{"/mock-test/3_stat/1"}, statFiles)
require.Len(t, splitKeys, 0)
// read after drain all data
endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Len(t, dataFiles, 0)
require.Len(t, statFiles, 0)
require.Len(t, splitKeys, 0)
require.NoError(t, splitter.Close())
}
func TestExactlyKeyNum(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
kvNum := 3
keys := make([][]byte, kvNum)
values := make([][]byte, kvNum)
for i := range keys {
keys[i] = []byte(fmt.Sprintf("key%03d", i))
values[i] = []byte(fmt.Sprintf("value%03d", i))
}
subDir := "/mock-test"
writer := NewWriterBuilder().
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 5)
dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, keys, values)
require.NoError(t, err)
// maxRangeKeys = 3
splitter, err := NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, 100, 1000, 3,
)
require.NoError(t, err)
endKey, splitDataFiles, splitStatFiles, splitKeys, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Equal(t, dataFiles, splitDataFiles)
require.Equal(t, statFiles, splitStatFiles)
require.Len(t, splitKeys, 0)
// rangesGroupKeys = 3
splitter, err = NewRangeSplitter(
ctx, dataFiles, statFiles, memStore, 1000, 3, 1000, 1,
)
require.NoError(t, err)
endKey, splitDataFiles, splitStatFiles, splitKeys, err = splitter.SplitOneRangesGroup()
require.NoError(t, err)
require.Nil(t, endKey)
require.Equal(t, dataFiles, splitDataFiles)
require.Equal(t, statFiles, splitStatFiles)
require.Equal(t, [][]byte{[]byte("key001"), []byte("key002")}, splitKeys)
}

View File

@ -67,7 +67,7 @@ func seekPropsOffsets(
moved := false
for iter.Next() {
p := iter.prop()
propKey := kv.Key(p.key)
propKey := kv.Key(p.firstKey)
if propKey.Cmp(start) > 0 {
if !moved {
return nil, fmt.Errorf("start key %s is too small for stat files %v",
@ -164,12 +164,25 @@ func MockExternalEngine(
keys [][]byte,
values [][]byte,
) (dataFiles []string, statsFiles []string, err error) {
ctx := context.Background()
subDir := "/mock-test"
writer := NewWriterBuilder().
SetMemorySizeLimit(128).
SetPropSizeDistance(32).
SetPropKeysDistance(4).
Build(storage, "/mock-test", 0)
return MockExternalEngineWithWriter(storage, writer, subDir, keys, values)
}
// MockExternalEngineWithWriter generates an external engine with the given
// writer, keys and values.
func MockExternalEngineWithWriter(
storage storage.ExternalStorage,
writer *Writer,
subDir string,
keys [][]byte,
values [][]byte,
) (dataFiles []string, statsFiles []string, err error) {
ctx := context.Background()
kvs := make([]common.KvPair, len(keys))
for i := range keys {
kvs[i].Key = keys[i]
@ -184,5 +197,5 @@ func MockExternalEngine(
if err != nil {
return nil, nil, err
}
return GetAllFileNames(ctx, storage, "/mock-test")
return GetAllFileNames(ctx, storage, subDir)
}

View File

@ -45,16 +45,16 @@ func TestSeekPropsOffsets(t *testing.T) {
rc1 := &rangePropertiesCollector{
props: []*rangeProperty{
{
key: []byte("key1"),
offset: 10,
firstKey: []byte("key1"),
offset: 10,
},
{
key: []byte("key3"),
offset: 30,
firstKey: []byte("key3"),
offset: 30,
},
{
key: []byte("key5"),
offset: 50,
firstKey: []byte("key5"),
offset: 50,
},
},
}
@ -69,12 +69,12 @@ func TestSeekPropsOffsets(t *testing.T) {
rc2 := &rangePropertiesCollector{
props: []*rangeProperty{
{
key: []byte("key2"),
offset: 20,
firstKey: []byte("key2"),
offset: 20,
},
{
key: []byte("key4"),
offset: 40,
firstKey: []byte("key4"),
offset: 40,
},
},
}