lightning/external: add statistic for multiple data files (#46614)
ref pingcap/tidb#45719
This commit is contained in:
@ -53,7 +53,7 @@ go_test(
|
||||
],
|
||||
embed = [":external"],
|
||||
flaky = True,
|
||||
shard_count = 28,
|
||||
shard_count = 31,
|
||||
deps = [
|
||||
"//br/pkg/lightning/backend/kv",
|
||||
"//br/pkg/lightning/common",
|
||||
|
||||
48
br/pkg/lightning/backend/external/util.go
vendored
48
br/pkg/lightning/backend/external/util.go
vendored
@ -19,6 +19,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
@ -199,3 +200,50 @@ func MockExternalEngineWithWriter(
|
||||
}
|
||||
return GetAllFileNames(ctx, storage, subDir)
|
||||
}
|
||||
|
||||
// EndpointTp is the type of Endpoint.Key.
|
||||
type EndpointTp int
|
||||
|
||||
const (
|
||||
// ExclusiveEnd represents "..., Endpoint.Key)".
|
||||
ExclusiveEnd EndpointTp = iota
|
||||
// InclusiveStart represents "[Endpoint.Key, ...".
|
||||
InclusiveStart
|
||||
// InclusiveEnd represents "..., Endpoint.Key]".
|
||||
InclusiveEnd
|
||||
)
|
||||
|
||||
// Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping.
|
||||
type Endpoint struct {
|
||||
Key []byte
|
||||
Tp EndpointTp
|
||||
Weight uint64 // all EndpointTp use positive weight
|
||||
}
|
||||
|
||||
// GetMaxOverlapping returns the maximum overlapping weight treating given
|
||||
// `points` as endpoints of intervals. `points` are not required to be sorted,
|
||||
// and will be sorted in-place in this function.
|
||||
func GetMaxOverlapping(points []Endpoint) int {
|
||||
slices.SortFunc(points, func(i, j Endpoint) int {
|
||||
if cmp := bytes.Compare(i.Key, j.Key); cmp != 0 {
|
||||
return cmp
|
||||
}
|
||||
return int(i.Tp) - int(j.Tp)
|
||||
})
|
||||
var maxWeight uint64
|
||||
var curWeight uint64
|
||||
for _, p := range points {
|
||||
switch p.Tp {
|
||||
case InclusiveStart:
|
||||
curWeight += p.Weight
|
||||
case ExclusiveEnd:
|
||||
curWeight -= p.Weight
|
||||
case InclusiveEnd:
|
||||
curWeight -= p.Weight
|
||||
}
|
||||
if curWeight > maxWeight {
|
||||
maxWeight = curWeight
|
||||
}
|
||||
}
|
||||
return int(maxWeight)
|
||||
}
|
||||
|
||||
31
br/pkg/lightning/backend/external/util_test.go
vendored
31
br/pkg/lightning/backend/external/util_test.go
vendored
@ -209,3 +209,34 @@ func TestCleanUpFiles(t *testing.T) {
|
||||
require.Equal(t, []string(nil), statFiles)
|
||||
require.Equal(t, []string(nil), dataFiles)
|
||||
}
|
||||
|
||||
func TestGetMaxOverlapping(t *testing.T) {
|
||||
// [1, 3), [2, 4)
|
||||
points := []Endpoint{
|
||||
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
|
||||
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
|
||||
}
|
||||
require.Equal(t, 2, GetMaxOverlapping(points))
|
||||
// [1, 3), [2, 4), [3, 5)
|
||||
points = []Endpoint{
|
||||
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
|
||||
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
|
||||
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{5}, Tp: ExclusiveEnd, Weight: 1},
|
||||
}
|
||||
require.Equal(t, 2, GetMaxOverlapping(points))
|
||||
// [1, 3], [2, 4], [3, 5]
|
||||
points = []Endpoint{
|
||||
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{3}, Tp: InclusiveEnd, Weight: 1},
|
||||
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{4}, Tp: InclusiveEnd, Weight: 1},
|
||||
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
|
||||
{Key: []byte{5}, Tp: InclusiveEnd, Weight: 1},
|
||||
}
|
||||
require.Equal(t, 3, GetMaxOverlapping(points))
|
||||
}
|
||||
|
||||
110
br/pkg/lightning/backend/external/writer.go
vendored
110
br/pkg/lightning/backend/external/writer.go
vendored
@ -36,6 +36,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var multiFileStatNum = 500
|
||||
|
||||
// rangePropertiesCollector collects range properties for each range. The zero
|
||||
// value of rangePropertiesCollector is not ready to use, should call reset()
|
||||
// first.
|
||||
@ -59,11 +61,12 @@ func (rc *rangePropertiesCollector) encode() []byte {
|
||||
|
||||
// WriterSummary is the summary of a writer.
|
||||
type WriterSummary struct {
|
||||
WriterID int
|
||||
Seq int
|
||||
Min tidbkv.Key
|
||||
Max tidbkv.Key
|
||||
TotalSize uint64
|
||||
WriterID int
|
||||
Seq int
|
||||
Min tidbkv.Key
|
||||
Max tidbkv.Key
|
||||
TotalSize uint64
|
||||
MultipleFilesStats []MultipleFilesStat
|
||||
}
|
||||
|
||||
// OnCloseFunc is the callback function when a writer is closed.
|
||||
@ -155,7 +158,7 @@ func (b *WriterBuilder) Build(
|
||||
if b.dupeDetectEnabled {
|
||||
keyAdapter = common.DupDetectKeyAdapter{}
|
||||
}
|
||||
return &Writer{
|
||||
ret := &Writer{
|
||||
rc: &rangePropertiesCollector{
|
||||
props: make([]*rangeProperty, 0, 1024),
|
||||
currProp: &rangeProperty{},
|
||||
@ -173,7 +176,47 @@ func (b *WriterBuilder) Build(
|
||||
kvStore: nil,
|
||||
onClose: b.onClose,
|
||||
closed: false,
|
||||
multiFileStats: make([]MultipleFilesStat, 1),
|
||||
fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum),
|
||||
fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum),
|
||||
}
|
||||
ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum)
|
||||
return ret
|
||||
}
|
||||
|
||||
// MultipleFilesStat is the statistic information of multiple files (currently
|
||||
// every 500 files). It is used to estimate the data overlapping, and per-file
|
||||
// statistic information maybe too big to loaded into memory.
|
||||
type MultipleFilesStat struct {
|
||||
MinKey tidbkv.Key
|
||||
MaxKey tidbkv.Key
|
||||
Filenames [][2]string // [dataFile, statFile]
|
||||
MaxOverlappingNum int
|
||||
}
|
||||
|
||||
func (m *MultipleFilesStat) build(startKeys, endKeys []tidbkv.Key) {
|
||||
if len(startKeys) == 0 {
|
||||
return
|
||||
}
|
||||
m.MinKey = startKeys[0]
|
||||
m.MaxKey = endKeys[0]
|
||||
for i := 1; i < len(startKeys); i++ {
|
||||
if m.MinKey.Cmp(startKeys[i]) > 0 {
|
||||
m.MinKey = startKeys[i]
|
||||
}
|
||||
if m.MaxKey.Cmp(endKeys[i]) < 0 {
|
||||
m.MaxKey = endKeys[i]
|
||||
}
|
||||
}
|
||||
|
||||
points := make([]Endpoint, 0, len(startKeys)*2)
|
||||
for _, k := range startKeys {
|
||||
points = append(points, Endpoint{Key: k, Tp: InclusiveStart, Weight: 1})
|
||||
}
|
||||
for _, k := range endKeys {
|
||||
points = append(points, Endpoint{Key: k, Tp: InclusiveEnd, Weight: 1})
|
||||
}
|
||||
m.MaxOverlappingNum = GetMaxOverlapping(points)
|
||||
}
|
||||
|
||||
// Writer is used to write data into external storage.
|
||||
@ -198,6 +241,11 @@ type Writer struct {
|
||||
// Statistic information per batch.
|
||||
batchSize uint64
|
||||
|
||||
// Statistic information per 500 batches.
|
||||
multiFileStats []MultipleFilesStat
|
||||
fileMinKeys []tidbkv.Key
|
||||
fileMaxKeys []tidbkv.Key
|
||||
|
||||
// Statistic information per writer.
|
||||
minKey tidbkv.Key
|
||||
maxKey tidbkv.Key
|
||||
@ -218,7 +266,7 @@ func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) e
|
||||
|
||||
w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
|
||||
if w.batchSize >= w.memSizeLimit {
|
||||
if err := w.flushKVs(ctx); err != nil {
|
||||
if err := w.flushKVs(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -238,10 +286,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
|
||||
}
|
||||
w.closed = true
|
||||
defer w.kvBuffer.Destroy()
|
||||
err := w.flushKVs(ctx)
|
||||
err := w.flushKVs(ctx, true)
|
||||
if err != nil {
|
||||
return status(false), err
|
||||
}
|
||||
// remove the trailing empty MultipleFilesStat
|
||||
w.multiFileStats = w.multiFileStats[:len(w.multiFileStats)-1]
|
||||
|
||||
logutil.Logger(ctx).Info("close writer",
|
||||
zap.Int("writerID", w.writerID),
|
||||
@ -251,11 +301,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
|
||||
w.writeBatch = nil
|
||||
|
||||
w.onClose(&WriterSummary{
|
||||
WriterID: w.writerID,
|
||||
Seq: w.currentSeq,
|
||||
Min: w.minKey,
|
||||
Max: w.maxKey,
|
||||
TotalSize: w.totalSize,
|
||||
WriterID: w.writerID,
|
||||
Seq: w.currentSeq,
|
||||
Min: w.minKey,
|
||||
Max: w.maxKey,
|
||||
TotalSize: w.totalSize,
|
||||
MultipleFilesStats: w.multiFileStats,
|
||||
})
|
||||
return status(true), nil
|
||||
}
|
||||
@ -277,13 +328,13 @@ func (s status) Flushed() bool {
|
||||
return bool(s)
|
||||
}
|
||||
|
||||
func (w *Writer) flushKVs(ctx context.Context) (err error) {
|
||||
func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
|
||||
if len(w.writeBatch) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := logutil.Logger(ctx)
|
||||
dataWriter, statWriter, err := w.createStorageWriter(ctx)
|
||||
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -339,6 +390,23 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) {
|
||||
|
||||
w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize)
|
||||
|
||||
// maintain 500-batch statistics
|
||||
|
||||
l := len(w.multiFileStats)
|
||||
w.multiFileStats[l-1].Filenames = append(w.multiFileStats[l-1].Filenames,
|
||||
[2]string{dataFile, statFile},
|
||||
)
|
||||
w.fileMinKeys = append(w.fileMinKeys, tidbkv.Key(w.writeBatch[0].Key).Clone())
|
||||
w.fileMaxKeys = append(w.fileMaxKeys, tidbkv.Key(w.writeBatch[len(w.writeBatch)-1].Key).Clone())
|
||||
if fromClose || len(w.multiFileStats[l-1].Filenames) == multiFileStatNum {
|
||||
w.multiFileStats[l-1].build(w.fileMinKeys, w.fileMaxKeys)
|
||||
w.multiFileStats = append(w.multiFileStats, MultipleFilesStat{
|
||||
Filenames: make([][2]string, 0, multiFileStatNum),
|
||||
})
|
||||
w.fileMinKeys = w.fileMinKeys[:0]
|
||||
w.fileMaxKeys = w.fileMaxKeys[:0]
|
||||
}
|
||||
|
||||
w.writeBatch = w.writeBatch[:0]
|
||||
w.rc.reset()
|
||||
w.kvBuffer.Reset()
|
||||
@ -347,16 +415,20 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.ExternalFileWriter, err error) {
|
||||
func (w *Writer) createStorageWriter(ctx context.Context) (
|
||||
dataFile, statFile string,
|
||||
data, stats storage.ExternalFileWriter,
|
||||
err error,
|
||||
) {
|
||||
dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq))
|
||||
dataWriter, err := w.store.Create(ctx, dataPath, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return "", "", nil, nil, err
|
||||
}
|
||||
statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq))
|
||||
statsWriter, err := w.store.Create(ctx, statPath, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return "", "", nil, nil, err
|
||||
}
|
||||
return dataWriter, statsWriter, nil
|
||||
return dataPath, statPath, dataWriter, statsWriter, nil
|
||||
}
|
||||
|
||||
119
br/pkg/lightning/backend/external/writer_test.go
vendored
119
br/pkg/lightning/backend/external/writer_test.go
vendored
@ -212,3 +212,122 @@ func TestWriterDuplicateDetect(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "found duplicate key")
|
||||
}
|
||||
|
||||
func TestMultiFileStat(t *testing.T) {
|
||||
s := &MultipleFilesStat{}
|
||||
// [3, 5], [1, 3], [2, 4]
|
||||
startKeys := []dbkv.Key{{3}, {1}, {2}}
|
||||
endKeys := []dbkv.Key{{5}, {3}, {4}}
|
||||
s.build(startKeys, endKeys)
|
||||
require.EqualValues(t, []byte{1}, s.MinKey)
|
||||
require.EqualValues(t, []byte{5}, s.MaxKey)
|
||||
require.EqualValues(t, 3, s.MaxOverlappingNum)
|
||||
}
|
||||
|
||||
func TestWriterMultiFileStat(t *testing.T) {
|
||||
oldMultiFileStatNum := multiFileStatNum
|
||||
t.Cleanup(func() {
|
||||
multiFileStatNum = oldMultiFileStatNum
|
||||
})
|
||||
multiFileStatNum = 3
|
||||
|
||||
ctx := context.Background()
|
||||
memStore := storage.NewMemStorage()
|
||||
var summary *WriterSummary
|
||||
|
||||
writer := NewWriterBuilder().
|
||||
SetPropKeysDistance(2).
|
||||
SetMemorySizeLimit(20). // 2 KV pair will trigger flush
|
||||
SetOnCloseFunc(func(s *WriterSummary) {
|
||||
summary = s
|
||||
}).
|
||||
Build(memStore, "/test", 0)
|
||||
|
||||
kvs := make([]common.KvPair, 0, 18)
|
||||
// [key01, key02], [key03, key04], [key05, key06]
|
||||
for i := 1; i <= 6; i++ {
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte(fmt.Sprintf("key%02d", i)),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
}
|
||||
// [key11, key13], [key12, key15], [key14, key16]
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key11"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key13"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key12"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key15"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key14"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte("key16"),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
// [key20, key22], [key21, key23], [key22, key24]
|
||||
for i := 0; i < 3; i++ {
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte(fmt.Sprintf("key2%d", i)),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
kvs = append(kvs, common.KvPair{
|
||||
Key: []byte(fmt.Sprintf("key2%d", i+2)),
|
||||
Val: []byte("56789"),
|
||||
})
|
||||
}
|
||||
|
||||
rows := kv.MakeRowsFromKvPairs(kvs)
|
||||
err := writer.AppendRows(ctx, nil, rows)
|
||||
require.NoError(t, err)
|
||||
_, err = writer.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 3, len(summary.MultipleFilesStats))
|
||||
expected := MultipleFilesStat{
|
||||
MinKey: []byte("key01"),
|
||||
MaxKey: []byte("key06"),
|
||||
Filenames: [][2]string{
|
||||
{"/test/0/0", "/test/0_stat/0"},
|
||||
{"/test/0/1", "/test/0_stat/1"},
|
||||
{"/test/0/2", "/test/0_stat/2"},
|
||||
},
|
||||
MaxOverlappingNum: 1,
|
||||
}
|
||||
require.Equal(t, expected, summary.MultipleFilesStats[0])
|
||||
expected = MultipleFilesStat{
|
||||
MinKey: []byte("key11"),
|
||||
MaxKey: []byte("key16"),
|
||||
Filenames: [][2]string{
|
||||
{"/test/0/3", "/test/0_stat/3"},
|
||||
{"/test/0/4", "/test/0_stat/4"},
|
||||
{"/test/0/5", "/test/0_stat/5"},
|
||||
},
|
||||
MaxOverlappingNum: 2,
|
||||
}
|
||||
require.Equal(t, expected, summary.MultipleFilesStats[1])
|
||||
expected = MultipleFilesStat{
|
||||
MinKey: []byte("key20"),
|
||||
MaxKey: []byte("key24"),
|
||||
Filenames: [][2]string{
|
||||
{"/test/0/6", "/test/0_stat/6"},
|
||||
{"/test/0/7", "/test/0_stat/7"},
|
||||
{"/test/0/8", "/test/0_stat/8"},
|
||||
},
|
||||
MaxOverlappingNum: 3,
|
||||
}
|
||||
require.Equal(t, expected, summary.MultipleFilesStats[2])
|
||||
require.EqualValues(t, "key01", summary.Min)
|
||||
require.EqualValues(t, "key24", summary.Max)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user