external: increase an unit of read chunk to avoid a whole reload (#50965)
ref pingcap/tidb#50752
This commit is contained in:
77
br/pkg/lightning/backend/external/bench_test.go
vendored
77
br/pkg/lightning/backend/external/bench_test.go
vendored
@ -16,6 +16,7 @@ package external
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -27,6 +28,7 @@ import (
|
||||
|
||||
"github.com/docker/go-units"
|
||||
"github.com/felixge/fgprof"
|
||||
"github.com/pingcap/tidb/br/pkg/membuf"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/util/intest"
|
||||
@ -43,11 +45,17 @@ type writeTestSuite struct {
|
||||
memoryLimit int
|
||||
beforeCreateWriter func()
|
||||
afterWriterClose func()
|
||||
|
||||
optionalFilePath string
|
||||
onClose OnCloseFunc
|
||||
}
|
||||
|
||||
func writePlainFile(s *writeTestSuite) {
|
||||
ctx := context.Background()
|
||||
filePath := "/test/writer"
|
||||
if s.optionalFilePath != "" {
|
||||
filePath = s.optionalFilePath
|
||||
}
|
||||
_ = s.store.DeleteFile(ctx, filePath)
|
||||
buf := make([]byte, s.memoryLimit)
|
||||
offset := 0
|
||||
@ -92,9 +100,13 @@ func cleanOldFiles(ctx context.Context, store storage.ExternalStorage, subDir st
|
||||
func writeExternalFile(s *writeTestSuite) {
|
||||
ctx := context.Background()
|
||||
filePath := "/test/writer"
|
||||
if s.optionalFilePath != "" {
|
||||
filePath = s.optionalFilePath
|
||||
}
|
||||
cleanOldFiles(ctx, s.store, filePath)
|
||||
builder := NewWriterBuilder().
|
||||
SetMemorySizeLimit(uint64(s.memoryLimit))
|
||||
SetMemorySizeLimit(uint64(s.memoryLimit)).
|
||||
SetOnCloseFunc(s.onClose)
|
||||
|
||||
if s.beforeCreateWriter != nil {
|
||||
s.beforeCreateWriter()
|
||||
@ -116,6 +128,9 @@ func writeExternalFile(s *writeTestSuite) {
|
||||
func writeExternalOneFile(s *writeTestSuite) {
|
||||
ctx := context.Background()
|
||||
filePath := "/test/writer"
|
||||
if s.optionalFilePath != "" {
|
||||
filePath = s.optionalFilePath
|
||||
}
|
||||
cleanOldFiles(ctx, s.store, filePath)
|
||||
builder := NewWriterBuilder().
|
||||
SetMemorySizeLimit(uint64(s.memoryLimit))
|
||||
@ -126,13 +141,21 @@ func writeExternalOneFile(s *writeTestSuite) {
|
||||
writer := builder.BuildOneFile(
|
||||
s.store, filePath, "writerID")
|
||||
intest.AssertNoError(writer.Init(ctx, 20*1024*1024))
|
||||
var minKey, maxKey []byte
|
||||
|
||||
key, val, _ := s.source.next()
|
||||
minKey = key
|
||||
for key != nil {
|
||||
maxKey = key
|
||||
err := writer.WriteRow(ctx, key, val)
|
||||
intest.AssertNoError(err)
|
||||
key, val, _ = s.source.next()
|
||||
}
|
||||
intest.AssertNoError(writer.Close(ctx))
|
||||
s.onClose(&WriterSummary{
|
||||
Min: minKey,
|
||||
Max: maxKey,
|
||||
})
|
||||
if s.afterWriterClose != nil {
|
||||
s.afterWriterClose()
|
||||
}
|
||||
@ -674,3 +697,55 @@ func TestMergeBench(t *testing.T) {
|
||||
testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep)
|
||||
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep)
|
||||
}
|
||||
|
||||
func TestReadAllDataLargeFiles(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := openTestingStorage(t)
|
||||
|
||||
// ~ 100B * 20M = 2GB
|
||||
source := newAscendingKeyAsyncSource(20*1024*1024, 10, 90, nil)
|
||||
// ~ 1KB * 2M = 2GB
|
||||
source2 := newAscendingKeyAsyncSource(2*1024*1024, 10, 990, nil)
|
||||
var minKey, maxKey kv.Key
|
||||
recordMinMax := func(s *WriterSummary) {
|
||||
minKey = s.Min
|
||||
maxKey = s.Max
|
||||
}
|
||||
suite := &writeTestSuite{
|
||||
store: store,
|
||||
source: source,
|
||||
memoryLimit: 256 * 1024 * 1024,
|
||||
optionalFilePath: "/test/file",
|
||||
onClose: recordMinMax,
|
||||
}
|
||||
suite2 := &writeTestSuite{
|
||||
store: store,
|
||||
source: source2,
|
||||
memoryLimit: 256 * 1024 * 1024,
|
||||
optionalFilePath: "/test/file2",
|
||||
onClose: recordMinMax,
|
||||
}
|
||||
writeExternalOneFile(suite)
|
||||
t.Logf("minKey: %s, maxKey: %s", minKey, maxKey)
|
||||
writeExternalOneFile(suite2)
|
||||
t.Logf("minKey: %s, maxKey: %s", minKey, maxKey)
|
||||
|
||||
dataFiles, statFiles, err := GetAllFileNames(ctx, store, "")
|
||||
intest.AssertNoError(err)
|
||||
intest.Assert(len(dataFiles) == 2)
|
||||
|
||||
// choose the two keys so that expected concurrency is 579 and 19
|
||||
startKey, err := hex.DecodeString("00000001000000000000")
|
||||
intest.AssertNoError(err)
|
||||
endKey, err := hex.DecodeString("00a00000000000000000")
|
||||
intest.AssertNoError(err)
|
||||
bufPool := membuf.NewPool(
|
||||
membuf.WithBlockNum(0),
|
||||
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
|
||||
)
|
||||
output := &memKVsAndBuffers{}
|
||||
now := time.Now()
|
||||
err = readAllData(ctx, store, dataFiles, statFiles, startKey, endKey, bufPool, output)
|
||||
t.Logf("read all data cost: %s", time.Since(now))
|
||||
intest.AssertNoError(err)
|
||||
}
|
||||
|
||||
@ -16,9 +16,11 @@ package external
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/br/pkg/membuf"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
@ -325,6 +327,11 @@ func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int)
|
||||
zap.Int("dropBytes", r.concurrentReader.bufSizePerConc*(len(r.curBuf)-r.curBufIdx)-r.curBufOffset),
|
||||
zap.Int("curBufIdx", r.curBufIdx),
|
||||
)
|
||||
failpoint.Inject("assertReloadAtMostOnce", func() {
|
||||
if r.concurrentReader.reloadCnt > 1 {
|
||||
panic(fmt.Sprintf("reloadCnt is %d", r.concurrentReader.reloadCnt))
|
||||
}
|
||||
})
|
||||
r.concurrentReader.largeBufferPool.Destroy()
|
||||
r.concurrentReader.largeBuf = nil
|
||||
r.concurrentReader.now = false
|
||||
|
||||
18
br/pkg/lightning/backend/external/engine.go
vendored
18
br/pkg/lightning/backend/external/engine.go
vendored
@ -223,15 +223,15 @@ func getFilesReadConcurrency(
|
||||
startOffs, endOffs := offsets[0], offsets[1]
|
||||
for i := range statsFiles {
|
||||
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
|
||||
result[i] = max(result[i], 1)
|
||||
if result[i] > 1 {
|
||||
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
|
||||
zap.String("filename", statsFiles[i]),
|
||||
zap.Uint64("startOffset", startOffs[i]),
|
||||
zap.Uint64("endOffset", endOffs[i]),
|
||||
zap.Uint64("expected concurrency", result[i]),
|
||||
)
|
||||
}
|
||||
// let the stat internals cover the [startKey, endKey) since seekPropsOffsets
|
||||
// always return an offset that is less than or equal to the key.
|
||||
result[i] += 1
|
||||
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
|
||||
zap.String("filename", statsFiles[i]),
|
||||
zap.Uint64("startOffset", startOffs[i]),
|
||||
zap.Uint64("endOffset", endOffs[i]),
|
||||
zap.Uint64("expected concurrency", result[i]),
|
||||
)
|
||||
}
|
||||
return result, startOffs, nil
|
||||
}
|
||||
|
||||
47
br/pkg/lightning/backend/external/reader_test.go
vendored
47
br/pkg/lightning/backend/external/reader_test.go
vendored
@ -22,8 +22,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/membuf"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/pkg/util/size"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -106,3 +108,48 @@ func TestReadAllOneFile(t *testing.T) {
|
||||
|
||||
testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit)
|
||||
}
|
||||
|
||||
func TestReadLargeFile(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
memStore := storage.NewMemStorage()
|
||||
backup := ConcurrentReaderBufferSizePerConc
|
||||
t.Cleanup(func() {
|
||||
ConcurrentReaderBufferSizePerConc = backup
|
||||
})
|
||||
ConcurrentReaderBufferSizePerConc = 512 * 1024
|
||||
|
||||
w := NewWriterBuilder().
|
||||
SetPropSizeDistance(128*1024).
|
||||
SetPropKeysDistance(1000).
|
||||
BuildOneFile(memStore, "/test", "0")
|
||||
|
||||
require.NoError(t, w.Init(ctx, int64(5*size.MB)))
|
||||
|
||||
val := make([]byte, 10000)
|
||||
for i := 0; i < 10000; i++ {
|
||||
key := []byte(fmt.Sprintf("key%06d", i))
|
||||
require.NoError(t, w.WriteRow(ctx, key, val))
|
||||
}
|
||||
require.NoError(t, w.Close(ctx))
|
||||
|
||||
datas, stats, err := GetAllFileNames(ctx, memStore, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, datas, 1)
|
||||
|
||||
failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce", "return()")
|
||||
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce")
|
||||
|
||||
bufPool := membuf.NewPool(
|
||||
membuf.WithBlockNum(0),
|
||||
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
|
||||
)
|
||||
output := &memKVsAndBuffers{}
|
||||
startKey := []byte("key000000")
|
||||
maxKey := []byte("key004998")
|
||||
endKey := []byte("key004999")
|
||||
err = readAllData(ctx, memStore, datas, stats, startKey, endKey, bufPool, output)
|
||||
require.NoError(t, err)
|
||||
output.build(ctx)
|
||||
require.Equal(t, startKey, output.keys[0])
|
||||
require.Equal(t, maxKey, output.keys[len(output.keys)-1])
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user