diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index a01c8e9664..7776bc9ddd 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -53,7 +53,7 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 28, + shard_count = 31, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index ca3deb5d52..ee76b8b800 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "path/filepath" + "slices" "sort" "strings" @@ -199,3 +200,50 @@ func MockExternalEngineWithWriter( } return GetAllFileNames(ctx, storage, subDir) } + +// EndpointTp is the type of Endpoint.Key. +type EndpointTp int + +const ( + // ExclusiveEnd represents "..., Endpoint.Key)". + ExclusiveEnd EndpointTp = iota + // InclusiveStart represents "[Endpoint.Key, ...". + InclusiveStart + // InclusiveEnd represents "..., Endpoint.Key]". + InclusiveEnd +) + +// Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping. +type Endpoint struct { + Key []byte + Tp EndpointTp + Weight uint64 // all EndpointTp use positive weight +} + +// GetMaxOverlapping returns the maximum overlapping weight treating given +// `points` as endpoints of intervals. `points` are not required to be sorted, +// and will be sorted in-place in this function. +func GetMaxOverlapping(points []Endpoint) int { + slices.SortFunc(points, func(i, j Endpoint) int { + if cmp := bytes.Compare(i.Key, j.Key); cmp != 0 { + return cmp + } + return int(i.Tp) - int(j.Tp) + }) + var maxWeight uint64 + var curWeight uint64 + for _, p := range points { + switch p.Tp { + case InclusiveStart: + curWeight += p.Weight + case ExclusiveEnd: + curWeight -= p.Weight + case InclusiveEnd: + curWeight -= p.Weight + } + if curWeight > maxWeight { + maxWeight = curWeight + } + } + return int(maxWeight) +} diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index 379ccd7060..7c6678d9d6 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -209,3 +209,34 @@ func TestCleanUpFiles(t *testing.T) { require.Equal(t, []string(nil), statFiles) require.Equal(t, []string(nil), dataFiles) } + +func TestGetMaxOverlapping(t *testing.T) { + // [1, 3), [2, 4) + points := []Endpoint{ + {Key: []byte{1}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1}, + {Key: []byte{2}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1}, + } + require.Equal(t, 2, GetMaxOverlapping(points)) + // [1, 3), [2, 4), [3, 5) + points = []Endpoint{ + {Key: []byte{1}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1}, + {Key: []byte{2}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1}, + {Key: []byte{3}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{5}, Tp: ExclusiveEnd, Weight: 1}, + } + require.Equal(t, 2, GetMaxOverlapping(points)) + // [1, 3], [2, 4], [3, 5] + points = []Endpoint{ + {Key: []byte{1}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{3}, Tp: InclusiveEnd, Weight: 1}, + {Key: []byte{2}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{4}, Tp: InclusiveEnd, Weight: 1}, + {Key: []byte{3}, Tp: InclusiveStart, Weight: 1}, + {Key: []byte{5}, Tp: InclusiveEnd, Weight: 1}, + } + require.Equal(t, 3, GetMaxOverlapping(points)) +} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 500e02af81..0b5a7897be 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -36,6 +36,8 @@ import ( "go.uber.org/zap" ) +var multiFileStatNum = 500 + // rangePropertiesCollector collects range properties for each range. The zero // value of rangePropertiesCollector is not ready to use, should call reset() // first. @@ -59,11 +61,12 @@ func (rc *rangePropertiesCollector) encode() []byte { // WriterSummary is the summary of a writer. type WriterSummary struct { - WriterID int - Seq int - Min tidbkv.Key - Max tidbkv.Key - TotalSize uint64 + WriterID int + Seq int + Min tidbkv.Key + Max tidbkv.Key + TotalSize uint64 + MultipleFilesStats []MultipleFilesStat } // OnCloseFunc is the callback function when a writer is closed. @@ -155,7 +158,7 @@ func (b *WriterBuilder) Build( if b.dupeDetectEnabled { keyAdapter = common.DupDetectKeyAdapter{} } - return &Writer{ + ret := &Writer{ rc: &rangePropertiesCollector{ props: make([]*rangeProperty, 0, 1024), currProp: &rangeProperty{}, @@ -173,7 +176,47 @@ func (b *WriterBuilder) Build( kvStore: nil, onClose: b.onClose, closed: false, + multiFileStats: make([]MultipleFilesStat, 1), + fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum), + fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum), } + ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum) + return ret +} + +// MultipleFilesStat is the statistic information of multiple files (currently +// every 500 files). It is used to estimate the data overlapping, and per-file +// statistic information maybe too big to loaded into memory. +type MultipleFilesStat struct { + MinKey tidbkv.Key + MaxKey tidbkv.Key + Filenames [][2]string // [dataFile, statFile] + MaxOverlappingNum int +} + +func (m *MultipleFilesStat) build(startKeys, endKeys []tidbkv.Key) { + if len(startKeys) == 0 { + return + } + m.MinKey = startKeys[0] + m.MaxKey = endKeys[0] + for i := 1; i < len(startKeys); i++ { + if m.MinKey.Cmp(startKeys[i]) > 0 { + m.MinKey = startKeys[i] + } + if m.MaxKey.Cmp(endKeys[i]) < 0 { + m.MaxKey = endKeys[i] + } + } + + points := make([]Endpoint, 0, len(startKeys)*2) + for _, k := range startKeys { + points = append(points, Endpoint{Key: k, Tp: InclusiveStart, Weight: 1}) + } + for _, k := range endKeys { + points = append(points, Endpoint{Key: k, Tp: InclusiveEnd, Weight: 1}) + } + m.MaxOverlappingNum = GetMaxOverlapping(points) } // Writer is used to write data into external storage. @@ -198,6 +241,11 @@ type Writer struct { // Statistic information per batch. batchSize uint64 + // Statistic information per 500 batches. + multiFileStats []MultipleFilesStat + fileMinKeys []tidbkv.Key + fileMaxKeys []tidbkv.Key + // Statistic information per writer. minKey tidbkv.Key maxKey tidbkv.Key @@ -218,7 +266,7 @@ func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) e w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) if w.batchSize >= w.memSizeLimit { - if err := w.flushKVs(ctx); err != nil { + if err := w.flushKVs(ctx, false); err != nil { return err } } @@ -238,10 +286,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { } w.closed = true defer w.kvBuffer.Destroy() - err := w.flushKVs(ctx) + err := w.flushKVs(ctx, true) if err != nil { return status(false), err } + // remove the trailing empty MultipleFilesStat + w.multiFileStats = w.multiFileStats[:len(w.multiFileStats)-1] logutil.Logger(ctx).Info("close writer", zap.Int("writerID", w.writerID), @@ -251,11 +301,12 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { w.writeBatch = nil w.onClose(&WriterSummary{ - WriterID: w.writerID, - Seq: w.currentSeq, - Min: w.minKey, - Max: w.maxKey, - TotalSize: w.totalSize, + WriterID: w.writerID, + Seq: w.currentSeq, + Min: w.minKey, + Max: w.maxKey, + TotalSize: w.totalSize, + MultipleFilesStats: w.multiFileStats, }) return status(true), nil } @@ -277,13 +328,13 @@ func (s status) Flushed() bool { return bool(s) } -func (w *Writer) flushKVs(ctx context.Context) (err error) { +func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { if len(w.writeBatch) == 0 { return nil } logger := logutil.Logger(ctx) - dataWriter, statWriter, err := w.createStorageWriter(ctx) + dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx) if err != nil { return err } @@ -339,6 +390,23 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize) + // maintain 500-batch statistics + + l := len(w.multiFileStats) + w.multiFileStats[l-1].Filenames = append(w.multiFileStats[l-1].Filenames, + [2]string{dataFile, statFile}, + ) + w.fileMinKeys = append(w.fileMinKeys, tidbkv.Key(w.writeBatch[0].Key).Clone()) + w.fileMaxKeys = append(w.fileMaxKeys, tidbkv.Key(w.writeBatch[len(w.writeBatch)-1].Key).Clone()) + if fromClose || len(w.multiFileStats[l-1].Filenames) == multiFileStatNum { + w.multiFileStats[l-1].build(w.fileMinKeys, w.fileMaxKeys) + w.multiFileStats = append(w.multiFileStats, MultipleFilesStat{ + Filenames: make([][2]string, 0, multiFileStatNum), + }) + w.fileMinKeys = w.fileMinKeys[:0] + w.fileMaxKeys = w.fileMaxKeys[:0] + } + w.writeBatch = w.writeBatch[:0] w.rc.reset() w.kvBuffer.Reset() @@ -347,16 +415,20 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { return nil } -func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.ExternalFileWriter, err error) { +func (w *Writer) createStorageWriter(ctx context.Context) ( + dataFile, statFile string, + data, stats storage.ExternalFileWriter, + err error, +) { dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) dataWriter, err := w.store.Create(ctx, dataPath, nil) if err != nil { - return nil, nil, err + return "", "", nil, nil, err } statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq)) statsWriter, err := w.store.Create(ctx, statPath, nil) if err != nil { - return nil, nil, err + return "", "", nil, nil, err } - return dataWriter, statsWriter, nil + return dataPath, statPath, dataWriter, statsWriter, nil } diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index 243d0cb6eb..2692978d08 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -212,3 +212,122 @@ func TestWriterDuplicateDetect(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "found duplicate key") } + +func TestMultiFileStat(t *testing.T) { + s := &MultipleFilesStat{} + // [3, 5], [1, 3], [2, 4] + startKeys := []dbkv.Key{{3}, {1}, {2}} + endKeys := []dbkv.Key{{5}, {3}, {4}} + s.build(startKeys, endKeys) + require.EqualValues(t, []byte{1}, s.MinKey) + require.EqualValues(t, []byte{5}, s.MaxKey) + require.EqualValues(t, 3, s.MaxOverlappingNum) +} + +func TestWriterMultiFileStat(t *testing.T) { + oldMultiFileStatNum := multiFileStatNum + t.Cleanup(func() { + multiFileStatNum = oldMultiFileStatNum + }) + multiFileStatNum = 3 + + ctx := context.Background() + memStore := storage.NewMemStorage() + var summary *WriterSummary + + writer := NewWriterBuilder(). + SetPropKeysDistance(2). + SetMemorySizeLimit(20). // 2 KV pair will trigger flush + SetOnCloseFunc(func(s *WriterSummary) { + summary = s + }). + Build(memStore, "/test", 0) + + kvs := make([]common.KvPair, 0, 18) + // [key01, key02], [key03, key04], [key05, key06] + for i := 1; i <= 6; i++ { + kvs = append(kvs, common.KvPair{ + Key: []byte(fmt.Sprintf("key%02d", i)), + Val: []byte("56789"), + }) + } + // [key11, key13], [key12, key15], [key14, key16] + kvs = append(kvs, common.KvPair{ + Key: []byte("key11"), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte("key13"), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte("key12"), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte("key15"), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte("key14"), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte("key16"), + Val: []byte("56789"), + }) + // [key20, key22], [key21, key23], [key22, key24] + for i := 0; i < 3; i++ { + kvs = append(kvs, common.KvPair{ + Key: []byte(fmt.Sprintf("key2%d", i)), + Val: []byte("56789"), + }) + kvs = append(kvs, common.KvPair{ + Key: []byte(fmt.Sprintf("key2%d", i+2)), + Val: []byte("56789"), + }) + } + + rows := kv.MakeRowsFromKvPairs(kvs) + err := writer.AppendRows(ctx, nil, rows) + require.NoError(t, err) + _, err = writer.Close(ctx) + require.NoError(t, err) + + require.Equal(t, 3, len(summary.MultipleFilesStats)) + expected := MultipleFilesStat{ + MinKey: []byte("key01"), + MaxKey: []byte("key06"), + Filenames: [][2]string{ + {"/test/0/0", "/test/0_stat/0"}, + {"/test/0/1", "/test/0_stat/1"}, + {"/test/0/2", "/test/0_stat/2"}, + }, + MaxOverlappingNum: 1, + } + require.Equal(t, expected, summary.MultipleFilesStats[0]) + expected = MultipleFilesStat{ + MinKey: []byte("key11"), + MaxKey: []byte("key16"), + Filenames: [][2]string{ + {"/test/0/3", "/test/0_stat/3"}, + {"/test/0/4", "/test/0_stat/4"}, + {"/test/0/5", "/test/0_stat/5"}, + }, + MaxOverlappingNum: 2, + } + require.Equal(t, expected, summary.MultipleFilesStats[1]) + expected = MultipleFilesStat{ + MinKey: []byte("key20"), + MaxKey: []byte("key24"), + Filenames: [][2]string{ + {"/test/0/6", "/test/0_stat/6"}, + {"/test/0/7", "/test/0_stat/7"}, + {"/test/0/8", "/test/0_stat/8"}, + }, + MaxOverlappingNum: 3, + } + require.Equal(t, expected, summary.MultipleFilesStats[2]) + require.EqualValues(t, "key01", summary.Min) + require.EqualValues(t, "key24", summary.Max) +}