diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 98fdc27dd1..b6ed5d1065 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -227,7 +227,7 @@ func getFilesReadConcurrency( startKey, endKey []byte, ) ([]uint64, []uint64, error) { result := make([]uint64, len(statsFiles)) - offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage, false) + offsets, err := seekPropsOffsets(ctx, []kv.Key{startKey, endKey}, statsFiles, storage) if err != nil { return nil, nil, err } @@ -398,7 +398,7 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte logger.Info("no stats files", zap.String("startKey", hex.EncodeToString(start))) } else { - offs, err := seekPropsOffsets(ctx, []kv.Key{start}, e.statsFiles, e.storage, e.checkHotspot) + offs, err := seekPropsOffsets(ctx, []kv.Key{start}, e.statsFiles, e.storage) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index 3a6fa7e41e..8433dc262a 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -18,31 +18,41 @@ import ( "bytes" "context" "fmt" + "io" "slices" "sort" "strconv" "strings" "github.com/docker/go-units" + "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -// seekPropsOffsets seeks the statistic files to find the largest offset of -// sorted data file offsets such that the key at offset is less than or equal to -// the given start keys. Caller can specify multiple ascending keys and -// seekPropsOffsets will return the offsets list for each key. +// seekPropsOffsets reads the statistic files to find the largest offset of +// corresponding sorted data file such that the key at offset is less than or +// equal to the given start keys. These returned offsets can be used to seek data +// file reader, read, parse and skip few smaller keys, and then locate the needed +// data. +// +// To avoid potential data loss, it also checks at least one statistic file has a +// key larger than or equal to the start key. If not, we are afraid that some +// paths are missing, and the data between [start key, min(first key of +// statistic files)) are lost. +// +// Caller can specify multiple ascending keys and seekPropsOffsets will return +// the offsets list per file for each key. func seekPropsOffsets( ctx context.Context, starts []kv.Key, paths []string, exStorage storage.ExternalStorage, - checkHotSpot bool, ) (_ [][]uint64, err error) { logger := logutil.Logger(ctx) task := log.BeginTask(logger, "seek props offsets") @@ -50,60 +60,99 @@ func seekPropsOffsets( task.End(zapcore.ErrorLevel, err) }() - // adapt the NewMergePropIter argument types - multiFileStat := MultipleFilesStat{Filenames: make([][2]string, 0, len(paths))} - for _, path := range paths { - multiFileStat.Filenames = append(multiFileStat.Filenames, [2]string{"", path}) + offsetsPerFile := make([][]uint64, len(paths)) + for i := range offsetsPerFile { + offsetsPerFile[i] = make([]uint64, len(starts)) } - iter, err := NewMergePropIter(ctx, []MultipleFilesStat{multiFileStat}, exStorage, checkHotSpot) - if err != nil { + // Record first key if it is smaller than first key of "starts" key argument for + // each file, and check all files afterward. + firstKeyTooSmallCheckers := make([]kv.Key, len(paths)) + eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) + for i := range paths { + i := i + eg.Go(func() error { + r, err2 := newStatsReader(egCtx, exStorage, paths[i], 250*1024) + if err2 != nil { + if err2 == io.EOF { + return nil + } + return errors.Trace(err2) + } + defer r.Close() + + moved := false + keyIdx := 0 + curKey := starts[keyIdx] + + p, err3 := r.nextProp() + for { + switch err3 { + case nil: + case io.EOF: + // fill the rest of the offsets with the last offset + currOffset := offsetsPerFile[i][keyIdx] + for keyIdx++; keyIdx < len(starts); keyIdx++ { + offsetsPerFile[i][keyIdx] = currOffset + } + return nil + default: + return errors.Trace(err3) + } + propKey := kv.Key(p.firstKey) + for propKey.Cmp(curKey) > 0 { + if !moved { + if firstKeyTooSmallCheckers[i] == nil { + firstKeyTooSmallCheckers[i] = propKey + } + } + keyIdx++ + if keyIdx >= len(starts) { + return nil + } + offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1] + curKey = starts[keyIdx] + } + moved = true + offsetsPerFile[i][keyIdx] = p.offset + p, err3 = r.nextProp() + } + }) + } + + if err = eg.Wait(); err != nil { return nil, err } - defer func() { - if err := iter.Close(); err != nil { - logger.Warn("failed to close merge prop iterator", zap.Error(err)) - } - }() - offsets4AllKey := make([][]uint64, 0, len(starts)) - offsets := make([]uint64, len(paths)) - offsets4AllKey = append(offsets4AllKey, offsets) - moved := false - keyIdx := 0 - curKey := starts[keyIdx] - for iter.Next() { - p := iter.prop() - propKey := kv.Key(p.firstKey) - for propKey.Cmp(curKey) > 0 { - if !moved { - return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s", - curKey.String(), - paths, - propKey.String(), - ) - } - keyIdx++ - if keyIdx >= len(starts) { - return offsets4AllKey, nil - } - curKey = starts[keyIdx] - newOffsets := slices.Clone(offsets) - offsets4AllKey = append(offsets4AllKey, newOffsets) - offsets = newOffsets + hasNil := false + for _, k := range firstKeyTooSmallCheckers { + if k == nil { + hasNil = true + break } - moved = true - _, idx := iter.readerIndex() - offsets[idx] = p.offset } - if iter.Error() != nil { - return nil, iter.Error() + if !hasNil { + minKey := firstKeyTooSmallCheckers[0] + for _, k := range firstKeyTooSmallCheckers[1:] { + if k.Cmp(minKey) < 0 { + minKey = k + } + } + return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s", + starts[0].String(), + paths, + minKey.String(), + ) } - for len(offsets4AllKey) < len(starts) { - newOffsets := slices.Clone(offsets) - offsets4AllKey = append(offsets4AllKey, newOffsets) - offsets = newOffsets + + // TODO(lance6716): change the caller so we don't need to transpose the result + offsetsPerKey := make([][]uint64, len(starts)) + for i := range starts { + offsetsPerKey[i] = make([]uint64, len(paths)) + for j := range paths { + offsetsPerKey[i][j] = offsetsPerFile[j][i] + } } - return offsets4AllKey, nil + return offsetsPerKey, nil } // GetAllFileNames returns data file paths and stat file paths. Both paths are diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index 8dc271022d..1800a27d23 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -71,37 +71,37 @@ func TestSeekPropsOffsets(t *testing.T) { err = w2.Close(ctx) require.NoError(t, err) - got, err := seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5")}, []string{file1, file2}, store, true) + got, err := seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{10, 20}}, got) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key2.6")}, []string{file1, file2}, store, true) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key2.6")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{10, 20}, {10, 20}}, got) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2}, store, true) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{30, 20}}, got) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key3")}, []string{file1, file2}, store, true) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key3")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got) - _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store, true) + _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store) require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store, false) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{10, 0}}, got) - _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store, true) + _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store) require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store, false) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{50, 40}}, got) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999"), []byte("key999")}, []string{file1, file2}, store, false) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999"), []byte("key999")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{50, 40}, {50, 40}}, got) @@ -118,11 +118,11 @@ func TestSeekPropsOffsets(t *testing.T) { require.NoError(t, err) err = w4.Close(ctx) require.NoError(t, err) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2, file3, file4}, store, true) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2, file3, file4}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{30, 20, 0, 30}}, got) - got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3"), []byte("key999")}, []string{file1, file2, file3, file4}, store, true) + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3"), []byte("key999")}, []string{file1, file2, file3, file4}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{30, 20, 0, 30}, {50, 40, 0, 50}}, got) }