diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 4aa42ebab2..66144569ef 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -16,6 +16,7 @@ package external import ( "context" + "encoding/hex" "flag" "fmt" "io" @@ -27,6 +28,7 @@ import ( "github.com/docker/go-units" "github.com/felixge/fgprof" + "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" @@ -43,11 +45,17 @@ type writeTestSuite struct { memoryLimit int beforeCreateWriter func() afterWriterClose func() + + optionalFilePath string + onClose OnCloseFunc } func writePlainFile(s *writeTestSuite) { ctx := context.Background() filePath := "/test/writer" + if s.optionalFilePath != "" { + filePath = s.optionalFilePath + } _ = s.store.DeleteFile(ctx, filePath) buf := make([]byte, s.memoryLimit) offset := 0 @@ -92,9 +100,13 @@ func cleanOldFiles(ctx context.Context, store storage.ExternalStorage, subDir st func writeExternalFile(s *writeTestSuite) { ctx := context.Background() filePath := "/test/writer" + if s.optionalFilePath != "" { + filePath = s.optionalFilePath + } cleanOldFiles(ctx, s.store, filePath) builder := NewWriterBuilder(). - SetMemorySizeLimit(uint64(s.memoryLimit)) + SetMemorySizeLimit(uint64(s.memoryLimit)). + SetOnCloseFunc(s.onClose) if s.beforeCreateWriter != nil { s.beforeCreateWriter() @@ -116,6 +128,9 @@ func writeExternalFile(s *writeTestSuite) { func writeExternalOneFile(s *writeTestSuite) { ctx := context.Background() filePath := "/test/writer" + if s.optionalFilePath != "" { + filePath = s.optionalFilePath + } cleanOldFiles(ctx, s.store, filePath) builder := NewWriterBuilder(). SetMemorySizeLimit(uint64(s.memoryLimit)) @@ -126,13 +141,21 @@ func writeExternalOneFile(s *writeTestSuite) { writer := builder.BuildOneFile( s.store, filePath, "writerID") intest.AssertNoError(writer.Init(ctx, 20*1024*1024)) + var minKey, maxKey []byte + key, val, _ := s.source.next() + minKey = key for key != nil { + maxKey = key err := writer.WriteRow(ctx, key, val) intest.AssertNoError(err) key, val, _ = s.source.next() } intest.AssertNoError(writer.Close(ctx)) + s.onClose(&WriterSummary{ + Min: minKey, + Max: maxKey, + }) if s.afterWriterClose != nil { s.afterWriterClose() } @@ -674,3 +697,55 @@ func TestMergeBench(t *testing.T) { testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep) testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep) } + +func TestReadAllDataLargeFiles(t *testing.T) { + ctx := context.Background() + store := openTestingStorage(t) + + // ~ 100B * 20M = 2GB + source := newAscendingKeyAsyncSource(20*1024*1024, 10, 90, nil) + // ~ 1KB * 2M = 2GB + source2 := newAscendingKeyAsyncSource(2*1024*1024, 10, 990, nil) + var minKey, maxKey kv.Key + recordMinMax := func(s *WriterSummary) { + minKey = s.Min + maxKey = s.Max + } + suite := &writeTestSuite{ + store: store, + source: source, + memoryLimit: 256 * 1024 * 1024, + optionalFilePath: "/test/file", + onClose: recordMinMax, + } + suite2 := &writeTestSuite{ + store: store, + source: source2, + memoryLimit: 256 * 1024 * 1024, + optionalFilePath: "/test/file2", + onClose: recordMinMax, + } + writeExternalOneFile(suite) + t.Logf("minKey: %s, maxKey: %s", minKey, maxKey) + writeExternalOneFile(suite2) + t.Logf("minKey: %s, maxKey: %s", minKey, maxKey) + + dataFiles, statFiles, err := GetAllFileNames(ctx, store, "") + intest.AssertNoError(err) + intest.Assert(len(dataFiles) == 2) + + // choose the two keys so that expected concurrency is 579 and 19 + startKey, err := hex.DecodeString("00000001000000000000") + intest.AssertNoError(err) + endKey, err := hex.DecodeString("00a00000000000000000") + intest.AssertNoError(err) + bufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), + ) + output := &memKVsAndBuffers{} + now := time.Now() + err = readAllData(ctx, store, dataFiles, statFiles, startKey, endKey, bufPool, output) + t.Logf("read all data cost: %s", time.Since(now)) + intest.AssertNoError(err) +} diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index 769f1a6932..de93012060 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -16,9 +16,11 @@ package external import ( "context" + "fmt" "io" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/logutil" @@ -325,6 +327,11 @@ func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int) zap.Int("dropBytes", r.concurrentReader.bufSizePerConc*(len(r.curBuf)-r.curBufIdx)-r.curBufOffset), zap.Int("curBufIdx", r.curBufIdx), ) + failpoint.Inject("assertReloadAtMostOnce", func() { + if r.concurrentReader.reloadCnt > 1 { + panic(fmt.Sprintf("reloadCnt is %d", r.concurrentReader.reloadCnt)) + } + }) r.concurrentReader.largeBufferPool.Destroy() r.concurrentReader.largeBuf = nil r.concurrentReader.now = false diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 829f854461..cafe9cecff 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -223,15 +223,15 @@ func getFilesReadConcurrency( startOffs, endOffs := offsets[0], offsets[1] for i := range statsFiles { result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc) - result[i] = max(result[i], 1) - if result[i] > 1 { - logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency", - zap.String("filename", statsFiles[i]), - zap.Uint64("startOffset", startOffs[i]), - zap.Uint64("endOffset", endOffs[i]), - zap.Uint64("expected concurrency", result[i]), - ) - } + // let the stat internals cover the [startKey, endKey) since seekPropsOffsets + // always return an offset that is less than or equal to the key. + result[i] += 1 + logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency", + zap.String("filename", statsFiles[i]), + zap.Uint64("startOffset", startOffs[i]), + zap.Uint64("endOffset", endOffs[i]), + zap.Uint64("expected concurrency", result[i]), + ) } return result, startOffs, nil } diff --git a/br/pkg/lightning/backend/external/reader_test.go b/br/pkg/lightning/backend/external/reader_test.go index 329ac48a47..12f6e5151f 100644 --- a/br/pkg/lightning/backend/external/reader_test.go +++ b/br/pkg/lightning/backend/external/reader_test.go @@ -22,8 +22,10 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" @@ -106,3 +108,48 @@ func TestReadAllOneFile(t *testing.T) { testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit) } + +func TestReadLargeFile(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + backup := ConcurrentReaderBufferSizePerConc + t.Cleanup(func() { + ConcurrentReaderBufferSizePerConc = backup + }) + ConcurrentReaderBufferSizePerConc = 512 * 1024 + + w := NewWriterBuilder(). + SetPropSizeDistance(128*1024). + SetPropKeysDistance(1000). + BuildOneFile(memStore, "/test", "0") + + require.NoError(t, w.Init(ctx, int64(5*size.MB))) + + val := make([]byte, 10000) + for i := 0; i < 10000; i++ { + key := []byte(fmt.Sprintf("key%06d", i)) + require.NoError(t, w.WriteRow(ctx, key, val)) + } + require.NoError(t, w.Close(ctx)) + + datas, stats, err := GetAllFileNames(ctx, memStore, "") + require.NoError(t, err) + require.Len(t, datas, 1) + + failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce", "return()") + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce") + + bufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), + ) + output := &memKVsAndBuffers{} + startKey := []byte("key000000") + maxKey := []byte("key004998") + endKey := []byte("key004999") + err = readAllData(ctx, memStore, datas, stats, startKey, endKey, bufPool, output) + require.NoError(t, err) + output.build(ctx) + require.Equal(t, startKey, output.keys[0]) + require.Equal(t, maxKey, output.keys[len(output.keys)-1]) +}