4
br/pkg/lightning/backend/external/engine.go
vendored
4
br/pkg/lightning/backend/external/engine.go
vendored
@ -227,7 +227,7 @@ func getFilesReadConcurrency(
|
||||
startKey, endKey []byte,
|
||||
) ([]uint64, []uint64, error) {
|
||||
result := make([]uint64, len(statsFiles))
|
||||
offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage, false)
|
||||
offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -398,7 +398,7 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
|
||||
logger.Info("no stats files",
|
||||
zap.String("startKey", hex.EncodeToString(start)))
|
||||
} else {
|
||||
offs, err := seekPropsOffsets(ctx, []kv.Key{start}, e.statsFiles, e.storage, e.checkHotspot)
|
||||
offs, err := seekPropsOffsets(ctx, []kv.Key{start}, e.statsFiles, e.storage)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
153
br/pkg/lightning/backend/external/util.go
vendored
153
br/pkg/lightning/backend/external/util.go
vendored
@ -18,31 +18,41 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/util"
|
||||
"github.com/pingcap/tidb/pkg/util/hack"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// seekPropsOffsets seeks the statistic files to find the largest offset of
|
||||
// sorted data file offsets such that the key at offset is less than or equal to
|
||||
// the given start keys. Caller can specify multiple ascending keys and
|
||||
// seekPropsOffsets will return the offsets list for each key.
|
||||
// seekPropsOffsets reads the statistic files to find the largest offset of
|
||||
// corresponding sorted data file such that the key at offset is less than or
|
||||
// equal to the given start keys. These returned offsets can be used to seek data
|
||||
// file reader, read, parse and skip few smaller keys, and then locate the needed
|
||||
// data.
|
||||
//
|
||||
// To avoid potential data loss, it also checks at least one statistic file has a
|
||||
// key larger than or equal to the start key. If not, we are afraid that some
|
||||
// paths are missing, and the data between [start key, min(first key of
|
||||
// statistic files)) are lost.
|
||||
//
|
||||
// Caller can specify multiple ascending keys and seekPropsOffsets will return
|
||||
// the offsets list per file for each key.
|
||||
func seekPropsOffsets(
|
||||
ctx context.Context,
|
||||
starts []kv.Key,
|
||||
paths []string,
|
||||
exStorage storage.ExternalStorage,
|
||||
checkHotSpot bool,
|
||||
) (_ [][]uint64, err error) {
|
||||
logger := logutil.Logger(ctx)
|
||||
task := log.BeginTask(logger, "seek props offsets")
|
||||
@ -50,60 +60,99 @@ func seekPropsOffsets(
|
||||
task.End(zapcore.ErrorLevel, err)
|
||||
}()
|
||||
|
||||
// adapt the NewMergePropIter argument types
|
||||
multiFileStat := MultipleFilesStat{Filenames: make([][2]string, 0, len(paths))}
|
||||
for _, path := range paths {
|
||||
multiFileStat.Filenames = append(multiFileStat.Filenames, [2]string{"", path})
|
||||
offsetsPerFile := make([][]uint64, len(paths))
|
||||
for i := range offsetsPerFile {
|
||||
offsetsPerFile[i] = make([]uint64, len(starts))
|
||||
}
|
||||
iter, err := NewMergePropIter(ctx, []MultipleFilesStat{multiFileStat}, exStorage, checkHotSpot)
|
||||
if err != nil {
|
||||
// Record first key if it is smaller than first key of "starts" key argument for
|
||||
// each file, and check all files afterward.
|
||||
firstKeyTooSmallCheckers := make([]kv.Key, len(paths))
|
||||
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
|
||||
for i := range paths {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
r, err2 := newStatsReader(egCtx, exStorage, paths[i], 250*1024)
|
||||
if err2 != nil {
|
||||
if err2 == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return errors.Trace(err2)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
moved := false
|
||||
keyIdx := 0
|
||||
curKey := starts[keyIdx]
|
||||
|
||||
p, err3 := r.nextProp()
|
||||
for {
|
||||
switch err3 {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
// fill the rest of the offsets with the last offset
|
||||
currOffset := offsetsPerFile[i][keyIdx]
|
||||
for keyIdx++; keyIdx < len(starts); keyIdx++ {
|
||||
offsetsPerFile[i][keyIdx] = currOffset
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return errors.Trace(err3)
|
||||
}
|
||||
propKey := kv.Key(p.firstKey)
|
||||
for propKey.Cmp(curKey) > 0 {
|
||||
if !moved {
|
||||
if firstKeyTooSmallCheckers[i] == nil {
|
||||
firstKeyTooSmallCheckers[i] = propKey
|
||||
}
|
||||
}
|
||||
keyIdx++
|
||||
if keyIdx >= len(starts) {
|
||||
return nil
|
||||
}
|
||||
offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1]
|
||||
curKey = starts[keyIdx]
|
||||
}
|
||||
moved = true
|
||||
offsetsPerFile[i][keyIdx] = p.offset
|
||||
p, err3 = r.nextProp()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if err = eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := iter.Close(); err != nil {
|
||||
logger.Warn("failed to close merge prop iterator", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
offsets4AllKey := make([][]uint64, 0, len(starts))
|
||||
offsets := make([]uint64, len(paths))
|
||||
offsets4AllKey = append(offsets4AllKey, offsets)
|
||||
moved := false
|
||||
|
||||
keyIdx := 0
|
||||
curKey := starts[keyIdx]
|
||||
for iter.Next() {
|
||||
p := iter.prop()
|
||||
propKey := kv.Key(p.firstKey)
|
||||
for propKey.Cmp(curKey) > 0 {
|
||||
if !moved {
|
||||
return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s",
|
||||
curKey.String(),
|
||||
paths,
|
||||
propKey.String(),
|
||||
)
|
||||
}
|
||||
keyIdx++
|
||||
if keyIdx >= len(starts) {
|
||||
return offsets4AllKey, nil
|
||||
}
|
||||
curKey = starts[keyIdx]
|
||||
newOffsets := slices.Clone(offsets)
|
||||
offsets4AllKey = append(offsets4AllKey, newOffsets)
|
||||
offsets = newOffsets
|
||||
hasNil := false
|
||||
for _, k := range firstKeyTooSmallCheckers {
|
||||
if k == nil {
|
||||
hasNil = true
|
||||
break
|
||||
}
|
||||
moved = true
|
||||
_, idx := iter.readerIndex()
|
||||
offsets[idx] = p.offset
|
||||
}
|
||||
if iter.Error() != nil {
|
||||
return nil, iter.Error()
|
||||
if !hasNil {
|
||||
minKey := firstKeyTooSmallCheckers[0]
|
||||
for _, k := range firstKeyTooSmallCheckers[1:] {
|
||||
if k.Cmp(minKey) < 0 {
|
||||
minKey = k
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s",
|
||||
starts[0].String(),
|
||||
paths,
|
||||
minKey.String(),
|
||||
)
|
||||
}
|
||||
for len(offsets4AllKey) < len(starts) {
|
||||
newOffsets := slices.Clone(offsets)
|
||||
offsets4AllKey = append(offsets4AllKey, newOffsets)
|
||||
offsets = newOffsets
|
||||
|
||||
// TODO(lance6716): change the caller so we don't need to transpose the result
|
||||
offsetsPerKey := make([][]uint64, len(starts))
|
||||
for i := range starts {
|
||||
offsetsPerKey[i] = make([]uint64, len(paths))
|
||||
for j := range paths {
|
||||
offsetsPerKey[i][j] = offsetsPerFile[j][i]
|
||||
}
|
||||
}
|
||||
return offsets4AllKey, nil
|
||||
return offsetsPerKey, nil
|
||||
}
|
||||
|
||||
// GetAllFileNames returns data file paths and stat file paths. Both paths are
|
||||
|
||||
22
br/pkg/lightning/backend/external/util_test.go
vendored
22
br/pkg/lightning/backend/external/util_test.go
vendored
@ -71,37 +71,37 @@ func TestSeekPropsOffsets(t *testing.T) {
|
||||
err = w2.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5")}, []string{file1, file2}, store, true)
|
||||
got, err := seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{10, 20}}, got)
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key2.6")}, []string{file1, file2}, store, true)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key2.6")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{10, 20}, {10, 20}}, got)
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2}, store, true)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{30, 20}}, got)
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key3")}, []string{file1, file2}, store, true)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key3")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got)
|
||||
|
||||
_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store, true)
|
||||
_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
|
||||
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store, false)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{10, 0}}, got)
|
||||
|
||||
_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store, true)
|
||||
_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
|
||||
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store, false)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{50, 40}}, got)
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999"), []byte("key999")}, []string{file1, file2}, store, false)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999"), []byte("key999")}, []string{file1, file2}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{50, 40}, {50, 40}}, got)
|
||||
|
||||
@ -118,11 +118,11 @@ func TestSeekPropsOffsets(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = w4.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2, file3, file4}, store, true)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2, file3, file4}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{30, 20, 0, 30}}, got)
|
||||
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3"), []byte("key999")}, []string{file1, file2, file3, file4}, store, true)
|
||||
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3"), []byte("key999")}, []string{file1, file2, file3, file4}, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [][]uint64{{30, 20, 0, 30}, {50, 40, 0, 50}}, got)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user