326 lines
10 KiB
Go
326 lines
10 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"container/heap"
|
|
"context"
|
|
"math"
|
|
"slices"
|
|
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/size"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type exhaustedHeapElem struct {
|
|
key []byte
|
|
dataFile string
|
|
statFile string
|
|
}
|
|
|
|
type exhaustedHeap []exhaustedHeapElem
|
|
|
|
func (h exhaustedHeap) Len() int {
|
|
return len(h)
|
|
}
|
|
|
|
func (h exhaustedHeap) Less(i, j int) bool {
|
|
return bytes.Compare(h[i].key, h[j].key) < 0
|
|
}
|
|
|
|
func (h exhaustedHeap) Swap(i, j int) {
|
|
h[i], h[j] = h[j], h[i]
|
|
}
|
|
|
|
func (h *exhaustedHeap) Push(x any) {
|
|
*h = append(*h, x.(exhaustedHeapElem))
|
|
}
|
|
|
|
func (h *exhaustedHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[:n-1]
|
|
return x
|
|
}
|
|
|
|
// CalRangeSize calculates the range size and range keys.
|
|
// see writeStepMemShareCount for more info.
|
|
func CalRangeSize(memPerCore int64, regionSplitSize, regionSplitKeys int64) (int64, int64) {
|
|
ss := int64(float64(memPerCore) / writeStepMemShareCount)
|
|
var rangeSize int64
|
|
if ss < regionSplitSize {
|
|
rangeCnt := int64(math.Ceil(float64(regionSplitSize) / float64(ss)))
|
|
rangeSize = regionSplitSize/rangeCnt + 1
|
|
} else {
|
|
rangeSize = (ss / regionSplitSize) * regionSplitSize
|
|
}
|
|
avgKeySize := float64(regionSplitSize) / float64(regionSplitKeys)
|
|
return rangeSize, int64(float64(rangeSize) / avgKeySize)
|
|
}
|
|
|
|
// RangeSplitter is used to split key ranges of an external engine. Please see
|
|
// NewRangeSplitter and SplitOneRangesGroup for more details.
|
|
type RangeSplitter struct {
|
|
rangesGroupSize int64
|
|
rangesGroupKeys int64
|
|
|
|
rangeJobSize int64
|
|
rangeJobKeyCnt int64
|
|
rangeJobKeys [][]byte
|
|
|
|
regionSplitSize int64
|
|
regionSplitKeyCnt int64
|
|
regionSplitKeys [][]byte
|
|
|
|
propIter *MergePropIter
|
|
multiFileStat []MultipleFilesStat
|
|
|
|
// filename -> 2 level index in dataFiles/statFiles
|
|
activeDataFiles map[string][2]int
|
|
activeStatFiles map[string][2]int
|
|
curGroupSize int64
|
|
curGroupKeyCnt int64
|
|
curRangeJobSize int64
|
|
curRangeJobKeyCnt int64
|
|
recordRangeJobAfterNextProp bool
|
|
curRegionSplitSize int64
|
|
curRegionSplitKeyCnt int64
|
|
recordRegionSplitAfterNextProp bool
|
|
lastDataFile string
|
|
lastStatFile string
|
|
lastRangeProperty *rangeProperty
|
|
willExhaustHeap exhaustedHeap
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewRangeSplitter creates a new RangeSplitter to process the stat files of
|
|
// `multiFileStat` stored in `externalStorage`.
|
|
//
|
|
// `rangesGroupSize` and `rangesGroupKeyCnt` controls the total size and key
|
|
// count limit of the ranges group returned by one `SplitOneRangesGroup`
|
|
// invocation. The ranges group may contain multiple range jobs and region split
|
|
// keys. The size and keys limit of one range job are controlled by
|
|
// `rangeJobSize` and `rangeJobKeyCnt`. The size and keys limit of intervals of
|
|
// region split keys are controlled by `regionSplitSize` and `regionSplitKeyCnt`.
|
|
func NewRangeSplitter(
|
|
ctx context.Context,
|
|
multiFileStat []MultipleFilesStat,
|
|
externalStorage storeapi.Storage,
|
|
rangesGroupSize, rangesGroupKeyCnt int64,
|
|
rangeJobSize, rangeJobKeyCnt int64,
|
|
regionSplitSize, regionSplitKeyCnt int64,
|
|
) (*RangeSplitter, error) {
|
|
logger := logutil.Logger(ctx)
|
|
logger.Info("create range splitter",
|
|
zap.Int64("rangesGroupSize", rangesGroupSize),
|
|
zap.Int64("rangesGroupKeyCnt", rangesGroupKeyCnt),
|
|
zap.Int64("rangeJobSize", rangeJobSize),
|
|
zap.Int64("rangeJobKeyCnt", rangeJobKeyCnt),
|
|
zap.Int64("regionSplitSize", regionSplitSize),
|
|
zap.Int64("regionSplitKeyCnt", regionSplitKeyCnt),
|
|
)
|
|
propIter, err := NewMergePropIter(ctx, multiFileStat, externalStorage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &RangeSplitter{
|
|
rangesGroupSize: rangesGroupSize,
|
|
rangesGroupKeys: rangesGroupKeyCnt,
|
|
propIter: propIter,
|
|
multiFileStat: multiFileStat,
|
|
activeDataFiles: make(map[string][2]int),
|
|
activeStatFiles: make(map[string][2]int),
|
|
|
|
rangeJobSize: rangeJobSize,
|
|
rangeJobKeyCnt: rangeJobKeyCnt,
|
|
rangeJobKeys: make([][]byte, 0, 16),
|
|
|
|
regionSplitSize: regionSplitSize,
|
|
regionSplitKeyCnt: regionSplitKeyCnt,
|
|
regionSplitKeys: make([][]byte, 0, 16),
|
|
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// Close release the resources of RangeSplitter.
|
|
func (r *RangeSplitter) Close() error {
|
|
err := r.propIter.Close()
|
|
if err != nil {
|
|
r.logger.Error("close range splitter error", zap.Error(err))
|
|
return err
|
|
}
|
|
r.logger.Info("close range splitter")
|
|
return nil
|
|
}
|
|
|
|
// SplitOneRangesGroup splits one ranges group may contain multiple range jobs
|
|
// and region split keys. `endKeyOfGroup` represents the end key of the group,
|
|
// but it will be nil when the group is the last one. `dataFiles` and `statFiles`
|
|
// are all the files that have overlapping key ranges in this group.
|
|
// `interiorRangeJobKeys` are the interior boundary keys of the range jobs, the
|
|
// range can be constructed with start/end key at caller.
|
|
// `interiorRegionSplitKeys` are the split keys that will be used later to split
|
|
// regions.
|
|
func (r *RangeSplitter) SplitOneRangesGroup() (
|
|
endKeyOfGroup []byte,
|
|
dataFiles []string,
|
|
statFiles []string,
|
|
interiorRangeJobKeys [][]byte,
|
|
interiorRegionSplitKeys [][]byte,
|
|
err error,
|
|
) {
|
|
var (
|
|
exhaustedDataFiles, exhaustedStatFiles []string
|
|
retDataFiles, retStatFiles []string
|
|
returnAfterNextProp = false
|
|
)
|
|
|
|
for r.propIter.Next() {
|
|
if err = r.propIter.Error(); err != nil {
|
|
return nil, nil, nil, nil, nil, err
|
|
}
|
|
prop := r.propIter.prop()
|
|
r.curGroupSize += int64(prop.size)
|
|
r.curRangeJobSize += int64(prop.size)
|
|
r.curRegionSplitSize += int64(prop.size)
|
|
r.curGroupKeyCnt += int64(prop.keys)
|
|
r.curRangeJobKeyCnt += int64(prop.keys)
|
|
r.curRegionSplitKeyCnt += int64(prop.keys)
|
|
|
|
// if this Next call will close the last reader
|
|
if *r.propIter.baseCloseReaderFlag {
|
|
heap.Push(&r.willExhaustHeap, exhaustedHeapElem{
|
|
key: r.lastRangeProperty.lastKey,
|
|
dataFile: r.lastDataFile,
|
|
statFile: r.lastStatFile,
|
|
})
|
|
}
|
|
|
|
idx, idx2 := r.propIter.readerIndex()
|
|
filePair := r.multiFileStat[idx].Filenames[idx2]
|
|
dataFilePath := filePair[0]
|
|
statFilePath := filePair[1]
|
|
r.activeDataFiles[dataFilePath] = [2]int{idx, idx2}
|
|
r.activeStatFiles[statFilePath] = [2]int{idx, idx2}
|
|
r.lastDataFile = dataFilePath
|
|
r.lastStatFile = statFilePath
|
|
r.lastRangeProperty = prop
|
|
|
|
for r.willExhaustHeap.Len() > 0 &&
|
|
bytes.Compare(r.willExhaustHeap[0].key, prop.firstKey) < 0 {
|
|
exhaustedDataFiles = append(exhaustedDataFiles, r.willExhaustHeap[0].dataFile)
|
|
exhaustedStatFiles = append(exhaustedStatFiles, r.willExhaustHeap[0].statFile)
|
|
heap.Pop(&r.willExhaustHeap)
|
|
}
|
|
|
|
if returnAfterNextProp {
|
|
for _, p := range exhaustedDataFiles {
|
|
delete(r.activeDataFiles, p)
|
|
}
|
|
exhaustedDataFiles = exhaustedDataFiles[:0]
|
|
for _, p := range exhaustedStatFiles {
|
|
delete(r.activeStatFiles, p)
|
|
}
|
|
exhaustedStatFiles = exhaustedStatFiles[:0]
|
|
return prop.firstKey, retDataFiles, retStatFiles, r.takeRangeJobKeys(), r.takeRegionSplitKeys(), nil
|
|
}
|
|
if r.recordRangeJobAfterNextProp {
|
|
r.rangeJobKeys = append(r.rangeJobKeys, slices.Clone(prop.firstKey))
|
|
r.recordRangeJobAfterNextProp = false
|
|
}
|
|
if r.recordRegionSplitAfterNextProp {
|
|
r.regionSplitKeys = append(r.regionSplitKeys, slices.Clone(prop.firstKey))
|
|
r.recordRegionSplitAfterNextProp = false
|
|
}
|
|
|
|
// each KV need additional memory for 2 slice.
|
|
// we can enhance it later using SliceLocation.
|
|
rangeMemSize := r.curRangeJobSize + r.curRangeJobKeyCnt*size.SizeOfSlice*2
|
|
if rangeMemSize >= r.rangeJobSize || r.curRangeJobKeyCnt >= r.rangeJobKeyCnt {
|
|
r.curRangeJobSize = 0
|
|
r.curRangeJobKeyCnt = 0
|
|
r.recordRangeJobAfterNextProp = true
|
|
}
|
|
|
|
if r.curRegionSplitSize >= r.regionSplitSize || r.curRegionSplitKeyCnt >= r.regionSplitKeyCnt {
|
|
r.curRegionSplitSize = 0
|
|
r.curRegionSplitKeyCnt = 0
|
|
r.recordRegionSplitAfterNextProp = true
|
|
}
|
|
|
|
if r.curGroupSize >= r.rangesGroupSize || r.curGroupKeyCnt >= r.rangesGroupKeys {
|
|
retDataFiles, retStatFiles = r.cloneActiveFiles()
|
|
|
|
r.curGroupSize = 0
|
|
r.curGroupKeyCnt = 0
|
|
returnAfterNextProp = true
|
|
}
|
|
}
|
|
|
|
retDataFiles, retStatFiles = r.cloneActiveFiles()
|
|
r.activeDataFiles = make(map[string][2]int)
|
|
r.activeStatFiles = make(map[string][2]int)
|
|
return nil, retDataFiles, retStatFiles, r.takeRangeJobKeys(), r.takeRegionSplitKeys(), r.propIter.Error()
|
|
}
|
|
|
|
func (r *RangeSplitter) cloneActiveFiles() (data []string, stat []string) {
|
|
dataFiles := make([]string, 0, len(r.activeDataFiles))
|
|
for path := range r.activeDataFiles {
|
|
dataFiles = append(dataFiles, path)
|
|
}
|
|
slices.SortFunc(dataFiles, func(i, j string) int {
|
|
iInts := r.activeDataFiles[i]
|
|
jInts := r.activeDataFiles[j]
|
|
if iInts[0] != jInts[0] {
|
|
return iInts[0] - jInts[0]
|
|
}
|
|
return iInts[1] - jInts[1]
|
|
})
|
|
statFiles := make([]string, 0, len(r.activeStatFiles))
|
|
for path := range r.activeStatFiles {
|
|
statFiles = append(statFiles, path)
|
|
}
|
|
slices.SortFunc(statFiles, func(i, j string) int {
|
|
iInts := r.activeStatFiles[i]
|
|
jInts := r.activeStatFiles[j]
|
|
if iInts[0] != jInts[0] {
|
|
return iInts[0] - jInts[0]
|
|
}
|
|
return iInts[1] - jInts[1]
|
|
})
|
|
return dataFiles, statFiles
|
|
}
|
|
|
|
func (r *RangeSplitter) takeRangeJobKeys() [][]byte {
|
|
ret := make([][]byte, len(r.rangeJobKeys))
|
|
copy(ret, r.rangeJobKeys)
|
|
r.rangeJobKeys = r.rangeJobKeys[:0]
|
|
return ret
|
|
}
|
|
|
|
func (r *RangeSplitter) takeRegionSplitKeys() [][]byte {
|
|
ret := make([][]byte, len(r.regionSplitKeys))
|
|
copy(ret, r.regionSplitKeys)
|
|
r.regionSplitKeys = r.regionSplitKeys[:0]
|
|
return ret
|
|
}
|