lightning: extract split range for local/external engine (#46542)
ref pingcap/tidb#45719
This commit is contained in:
44
br/pkg/lightning/backend/external/engine.go
vendored
44
br/pkg/lightning/backend/external/engine.go
vendored
@ -38,6 +38,7 @@ type Engine struct {
|
||||
storage storage.ExternalStorage
|
||||
dataFiles []string
|
||||
statsFiles []string
|
||||
splitKeys [][]byte
|
||||
bufPool *membuf.Pool
|
||||
|
||||
iter *MergeKVIter
|
||||
@ -48,6 +49,9 @@ type Engine struct {
|
||||
dupDetectOpt common.DupDetectOpt
|
||||
ts uint64
|
||||
|
||||
totalKVSize int64
|
||||
totalKVLength int64
|
||||
|
||||
importedKVSize *atomic.Int64
|
||||
importedKVCount *atomic.Int64
|
||||
}
|
||||
@ -62,6 +66,8 @@ func NewExternalEngine(
|
||||
duplicateDB *pebble.DB,
|
||||
dupDetectOpt common.DupDetectOpt,
|
||||
ts uint64,
|
||||
totalKVSize int64,
|
||||
totakKVLength int64,
|
||||
) common.Engine {
|
||||
return &Engine{
|
||||
storage: storage,
|
||||
@ -73,6 +79,8 @@ func NewExternalEngine(
|
||||
duplicateDB: duplicateDB,
|
||||
dupDetectOpt: dupDetectOpt,
|
||||
ts: ts,
|
||||
totalKVSize: totalKVSize,
|
||||
totalKVLength: totakKVLength,
|
||||
importedKVSize: atomic.NewInt64(0),
|
||||
importedKVCount: atomic.NewInt64(0),
|
||||
}
|
||||
@ -169,6 +177,42 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// KVStatistics returns the total kv size and total kv length.
|
||||
func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) {
|
||||
return e.totalKVSize, e.totalKVLength
|
||||
}
|
||||
|
||||
// ImportedStatistics returns the imported kv size and imported kv length.
|
||||
func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) {
|
||||
return e.importedKVSize.Load(), e.importedKVCount.Load()
|
||||
}
|
||||
|
||||
// ID is the identifier of an engine.
|
||||
func (e *Engine) ID() string {
|
||||
return "external"
|
||||
}
|
||||
|
||||
// SplitRanges split the ranges by split keys provided by external engine.
|
||||
func (e *Engine) SplitRanges(
|
||||
startKey, endKey []byte,
|
||||
_, _ int64,
|
||||
_ log.Logger,
|
||||
) ([]common.Range, error) {
|
||||
splitKeys := e.splitKeys
|
||||
ranges := make([]common.Range, 0, len(splitKeys)+1)
|
||||
ranges = append(ranges, common.Range{Start: startKey})
|
||||
for i := 0; i < len(splitKeys); i++ {
|
||||
ranges[len(ranges)-1].End = splitKeys[i]
|
||||
var endK []byte
|
||||
if i < len(splitKeys)-1 {
|
||||
endK = splitKeys[i+1]
|
||||
}
|
||||
ranges = append(ranges, common.Range{Start: splitKeys[i], End: endK})
|
||||
}
|
||||
ranges[len(ranges)-1].End = endKey
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
// Close releases the resources of the engine.
|
||||
func (e *Engine) Close() error {
|
||||
if e.iter == nil {
|
||||
|
||||
@ -673,13 +673,13 @@ func (m *DupeDetector) splitLocalDupTaskByKeys(
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
ranges := splitRangeBySizeProps(Range{start: task.StartKey, end: task.EndKey}, sizeProps, sizeLimit, keysLimit)
|
||||
ranges := splitRangeBySizeProps(common.Range{Start: task.StartKey, End: task.EndKey}, sizeProps, sizeLimit, keysLimit)
|
||||
newDupTasks := make([]dupTask, 0, len(ranges))
|
||||
for _, r := range ranges {
|
||||
newDupTasks = append(newDupTasks, dupTask{
|
||||
KeyRange: tidbkv.KeyRange{
|
||||
StartKey: r.start,
|
||||
EndKey: r.end,
|
||||
StartKey: r.Start,
|
||||
EndKey: r.End,
|
||||
},
|
||||
tableID: task.tableID,
|
||||
indexInfo: task.indexInfo,
|
||||
|
||||
@ -87,10 +87,10 @@ type engineMeta struct {
|
||||
|
||||
type syncedRanges struct {
|
||||
sync.Mutex
|
||||
ranges []Range
|
||||
ranges []common.Range
|
||||
}
|
||||
|
||||
func (r *syncedRanges) add(g Range) {
|
||||
func (r *syncedRanges) add(g common.Range) {
|
||||
r.Lock()
|
||||
r.ranges = append(r.ranges, g)
|
||||
r.Unlock()
|
||||
@ -275,6 +275,41 @@ func (e *Engine) TotalMemorySize() int64 {
|
||||
return memSize
|
||||
}
|
||||
|
||||
// KVStatistics returns the total kv size and total kv length.
|
||||
func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) {
|
||||
return e.TotalSize.Load(), e.Length.Load()
|
||||
}
|
||||
|
||||
// ImportedStatistics returns the imported kv size and imported kv length.
|
||||
func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) {
|
||||
return e.importedKVSize.Load(), e.importedKVCount.Load()
|
||||
}
|
||||
|
||||
// ID is the identifier of an engine.
|
||||
func (e *Engine) ID() string {
|
||||
return e.UUID.String()
|
||||
}
|
||||
|
||||
// SplitRanges gets size properties from pebble and split ranges according to size/keys limit.
|
||||
func (e *Engine) SplitRanges(
|
||||
startKey, endKey []byte,
|
||||
sizeLimit, keysLimit int64,
|
||||
logger log.Logger,
|
||||
) ([]common.Range, error) {
|
||||
sizeProps, err := getSizePropertiesFn(logger, e.getDB(), e.keyAdapter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
ranges := splitRangeBySizeProps(
|
||||
common.Range{Start: startKey, End: endKey},
|
||||
sizeProps,
|
||||
sizeLimit,
|
||||
keysLimit,
|
||||
)
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
type rangeOffsets struct {
|
||||
Size uint64
|
||||
Keys uint64
|
||||
|
||||
@ -233,13 +233,6 @@ func (c loggingConn) Write(b []byte) (int, error) {
|
||||
return c.Conn.Write(b)
|
||||
}
|
||||
|
||||
// Range record start and end key for localStoreDir.DB
|
||||
// so we can write it to tikv in streaming
|
||||
type Range struct {
|
||||
start []byte
|
||||
end []byte // end is always exclusive except import_sstpb.SSTMeta
|
||||
}
|
||||
|
||||
type encodingBuilder struct {
|
||||
metrics *metric.Metrics
|
||||
}
|
||||
@ -985,23 +978,23 @@ func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst.
|
||||
return local.importClientFactory.Create(ctx, storeID)
|
||||
}
|
||||
|
||||
func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
|
||||
ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
|
||||
func splitRangeBySizeProps(fullRange common.Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []common.Range {
|
||||
ranges := make([]common.Range, 0, sizeProps.totalSize/uint64(sizeLimit))
|
||||
curSize := uint64(0)
|
||||
curKeys := uint64(0)
|
||||
curKey := fullRange.start
|
||||
curKey := fullRange.Start
|
||||
|
||||
sizeProps.iter(func(p *rangeProperty) bool {
|
||||
if bytes.Compare(p.Key, curKey) <= 0 {
|
||||
return true
|
||||
}
|
||||
if bytes.Compare(p.Key, fullRange.end) > 0 {
|
||||
if bytes.Compare(p.Key, fullRange.End) > 0 {
|
||||
return false
|
||||
}
|
||||
curSize += p.Size
|
||||
curKeys += p.Keys
|
||||
if int64(curSize) >= sizeLimit || int64(curKeys) >= keysLimit {
|
||||
ranges = append(ranges, Range{start: curKey, end: p.Key})
|
||||
ranges = append(ranges, common.Range{Start: curKey, End: p.Key})
|
||||
curKey = p.Key
|
||||
curSize = 0
|
||||
curKeys = 0
|
||||
@ -1009,23 +1002,23 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit
|
||||
return true
|
||||
})
|
||||
|
||||
if bytes.Compare(curKey, fullRange.end) < 0 {
|
||||
if bytes.Compare(curKey, fullRange.End) < 0 {
|
||||
// If the remaining range is too small, append it to last range.
|
||||
if len(ranges) > 0 && curKeys == 0 {
|
||||
ranges[len(ranges)-1].end = fullRange.end
|
||||
ranges[len(ranges)-1].End = fullRange.End
|
||||
} else {
|
||||
ranges = append(ranges, Range{start: curKey, end: fullRange.end})
|
||||
ranges = append(ranges, common.Range{Start: curKey, End: fullRange.End})
|
||||
}
|
||||
}
|
||||
return ranges
|
||||
}
|
||||
|
||||
func (local *Backend) readAndSplitIntoRange(
|
||||
func readAndSplitIntoRange(
|
||||
ctx context.Context,
|
||||
engine *Engine,
|
||||
sizeLimit int64,
|
||||
keysLimit int64,
|
||||
) ([]Range, error) {
|
||||
) ([]common.Range, error) {
|
||||
firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -1036,29 +1029,20 @@ func (local *Backend) readAndSplitIntoRange(
|
||||
|
||||
endKey := nextKey(lastKey)
|
||||
|
||||
engineFileTotalSize := engine.TotalSize.Load()
|
||||
engineFileLength := engine.Length.Load()
|
||||
engineFileTotalSize, engineFileLength := engine.KVStatistics()
|
||||
|
||||
if engineFileTotalSize <= sizeLimit && engineFileLength <= keysLimit {
|
||||
ranges := []Range{{start: firstKey, end: endKey}}
|
||||
ranges := []common.Range{{Start: firstKey, End: endKey}}
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))
|
||||
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps,
|
||||
sizeLimit, keysLimit)
|
||||
|
||||
logger.Info("split engine key ranges",
|
||||
logger := log.FromContext(ctx).With(zap.String("engine", engine.ID()))
|
||||
ranges, err := engine.SplitRanges(firstKey, endKey, sizeLimit, keysLimit, logger)
|
||||
logger.Info("split local engine key ranges",
|
||||
zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength),
|
||||
logutil.Key("firstKey", firstKey), logutil.Key("lastKey", lastKey),
|
||||
zap.Int("ranges", len(ranges)))
|
||||
|
||||
return ranges, nil
|
||||
zap.Int("ranges", len(ranges)), zap.Error(err))
|
||||
return ranges, err
|
||||
}
|
||||
|
||||
// prepareAndSendJob will read the engine to get estimated key range,
|
||||
@ -1069,14 +1053,13 @@ func (local *Backend) readAndSplitIntoRange(
|
||||
// seize the "first" error.
|
||||
func (local *Backend) prepareAndSendJob(
|
||||
ctx context.Context,
|
||||
engine *Engine,
|
||||
initialSplitRanges []Range,
|
||||
engine common.Engine,
|
||||
initialSplitRanges []common.Range,
|
||||
regionSplitSize, regionSplitKeys int64,
|
||||
jobToWorkerCh chan<- *regionJob,
|
||||
jobWg *sync.WaitGroup,
|
||||
) error {
|
||||
lfTotalSize := engine.TotalSize.Load()
|
||||
lfLength := engine.Length.Load()
|
||||
lfTotalSize, lfLength := engine.KVStatistics()
|
||||
log.FromContext(ctx).Info("import engine ranges", zap.Int("count", len(initialSplitRanges)))
|
||||
if len(initialSplitRanges) == 0 {
|
||||
return nil
|
||||
@ -1090,7 +1073,7 @@ func (local *Backend) prepareAndSendJob(
|
||||
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
|
||||
needSplit = true
|
||||
})
|
||||
logger := log.FromContext(ctx).With(zap.Stringer("uuid", engine.UUID)).Begin(zap.InfoLevel, "split and scatter ranges")
|
||||
logger := log.FromContext(ctx).With(zap.String("uuid", engine.ID())).Begin(zap.InfoLevel, "split and scatter ranges")
|
||||
for i := 0; i < maxRetryTimes; i++ {
|
||||
failpoint.Inject("skipSplitAndScatter", func() {
|
||||
failpoint.Break()
|
||||
@ -1101,7 +1084,7 @@ func (local *Backend) prepareAndSendJob(
|
||||
break
|
||||
}
|
||||
|
||||
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engine.UUID),
|
||||
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.String("engine ID", engine.ID()),
|
||||
log.ShortError(err), zap.Int("retry", i))
|
||||
}
|
||||
logger.End(zap.ErrorLevel, err)
|
||||
@ -1124,7 +1107,7 @@ func (local *Backend) prepareAndSendJob(
|
||||
func (local *Backend) generateAndSendJob(
|
||||
ctx context.Context,
|
||||
engine common.Engine,
|
||||
jobRanges []Range,
|
||||
jobRanges []common.Range,
|
||||
regionSplitSize, regionSplitKeys int64,
|
||||
jobToWorkerCh chan<- *regionJob,
|
||||
jobWg *sync.WaitGroup,
|
||||
@ -1136,16 +1119,15 @@ func (local *Backend) generateAndSendJob(
|
||||
// when use dynamic region feature, the region may be very big, we need
|
||||
// to split to smaller ranges to increase the concurrency.
|
||||
if regionSplitSize > 2*int64(config.SplitRegionSize) && ok {
|
||||
sizeProps, err := getSizePropertiesFn(logger, localEngine.getDB(), local.keyAdapter)
|
||||
start := jobRanges[0].Start
|
||||
end := jobRanges[len(jobRanges)-1].End
|
||||
sizeLimit := int64(config.SplitRegionSize)
|
||||
keysLimit := int64(config.SplitRegionKeys)
|
||||
jrs, err := localEngine.SplitRanges(start, end, sizeLimit, keysLimit, logger)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
jobRanges = splitRangeBySizeProps(
|
||||
Range{start: jobRanges[0].start, end: jobRanges[len(jobRanges)-1].end},
|
||||
sizeProps,
|
||||
int64(config.SplitRegionSize),
|
||||
int64(config.SplitRegionKeys))
|
||||
jobRanges = jrs
|
||||
}
|
||||
logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges)))
|
||||
|
||||
@ -1155,7 +1137,7 @@ func (local *Backend) generateAndSendJob(
|
||||
eg.SetLimit(local.WorkerConcurrency)
|
||||
for _, jobRange := range jobRanges {
|
||||
r := jobRange
|
||||
data, err := engine.LoadIngestData(ctx, r.start, r.end)
|
||||
data, err := engine.LoadIngestData(ctx, r.Start, r.End)
|
||||
if err != nil {
|
||||
cancel()
|
||||
err2 := eg.Wait()
|
||||
@ -1207,14 +1189,14 @@ var fakeRegionJobs map[[2]string]struct {
|
||||
func (local *Backend) generateJobForRange(
|
||||
ctx context.Context,
|
||||
data common.IngestData,
|
||||
keyRange Range,
|
||||
keyRange common.Range,
|
||||
regionSplitSize, regionSplitKeys int64,
|
||||
) ([]*regionJob, error) {
|
||||
failpoint.Inject("fakeRegionJobs", func() {
|
||||
if ctx.Err() != nil {
|
||||
failpoint.Return(nil, ctx.Err())
|
||||
}
|
||||
key := [2]string{string(keyRange.start), string(keyRange.end)}
|
||||
key := [2]string{string(keyRange.Start), string(keyRange.End)}
|
||||
injected := fakeRegionJobs[key]
|
||||
// overwrite the stage to regionScanned, because some time same keyRange
|
||||
// will be generated more than once.
|
||||
@ -1224,7 +1206,7 @@ func (local *Backend) generateJobForRange(
|
||||
failpoint.Return(injected.jobs, injected.err)
|
||||
})
|
||||
|
||||
start, end := keyRange.start, keyRange.end
|
||||
start, end := keyRange.Start, keyRange.End
|
||||
pairStart, pairEnd, err := data.GetFirstAndLastKey(start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -1259,7 +1241,7 @@ func (local *Backend) generateJobForRange(
|
||||
zap.Reflect("peers", region.Region.GetPeers()))
|
||||
|
||||
jobs = append(jobs, ®ionJob{
|
||||
keyRange: intersectRange(region.Region, Range{start: start, end: end}),
|
||||
keyRange: intersectRange(region.Region, common.Range{Start: start, End: end}),
|
||||
region: region,
|
||||
stage: regionScanned,
|
||||
ingestData: data,
|
||||
@ -1411,7 +1393,7 @@ func (local *Backend) executeJob(
|
||||
if job.writeResult == nil || job.writeResult.remainingStartKey == nil {
|
||||
return nil
|
||||
}
|
||||
job.keyRange.start = job.writeResult.remainingStartKey
|
||||
job.keyRange.Start = job.writeResult.remainingStartKey
|
||||
job.convertStageTo(regionScanned)
|
||||
}
|
||||
}
|
||||
@ -1425,8 +1407,7 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
|
||||
}
|
||||
defer lf.unlock()
|
||||
|
||||
lfTotalSize := lf.TotalSize.Load()
|
||||
lfLength := lf.Length.Load()
|
||||
lfTotalSize, lfLength := lf.KVStatistics()
|
||||
if lfTotalSize == 0 {
|
||||
// engine is empty, this is likes because it's a index engine but the table contains no index
|
||||
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
|
||||
@ -1445,7 +1426,7 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
|
||||
}
|
||||
|
||||
// split sorted file into range about regionSplitSize per file
|
||||
regionRanges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys)
|
||||
regionRanges, err := readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1456,11 +1437,11 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
|
||||
defer cancel()
|
||||
|
||||
var startKey, endKey []byte
|
||||
if len(regionRanges[0].start) > 0 {
|
||||
startKey = codec.EncodeBytes(nil, regionRanges[0].start)
|
||||
if len(regionRanges[0].Start) > 0 {
|
||||
startKey = codec.EncodeBytes(nil, regionRanges[0].Start)
|
||||
}
|
||||
if len(regionRanges[len(regionRanges)-1].end) > 0 {
|
||||
endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].end)
|
||||
if len(regionRanges[len(regionRanges)-1].End) > 0 {
|
||||
endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].End)
|
||||
}
|
||||
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
|
||||
if err != nil {
|
||||
@ -1474,8 +1455,8 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
|
||||
|
||||
if len(regionRanges) > 0 && local.BackendConfig.RaftKV2SwitchModeDuration > 0 {
|
||||
log.FromContext(ctx).Info("switch import mode of ranges",
|
||||
zap.String("startKey", hex.EncodeToString(regionRanges[0].start)),
|
||||
zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].end)))
|
||||
zap.String("startKey", hex.EncodeToString(regionRanges[0].Start)),
|
||||
zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].End)))
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -1499,17 +1480,18 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
|
||||
|
||||
err = local.doImport(ctx, lf, regionRanges, regionSplitSize, regionSplitKeys)
|
||||
if err == nil {
|
||||
importedSize, importedLength := lf.ImportedStatistics()
|
||||
log.FromContext(ctx).Info("import engine success",
|
||||
zap.Stringer("uuid", engineUUID),
|
||||
zap.Int64("size", lfTotalSize),
|
||||
zap.Int64("kvs", lfLength),
|
||||
zap.Int64("importedSize", lf.importedKVSize.Load()),
|
||||
zap.Int64("importedCount", lf.importedKVCount.Load()))
|
||||
zap.Int64("importedSize", importedSize),
|
||||
zap.Int64("importedCount", importedLength))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges []Range, regionSplitSize, regionSplitKeys int64) error {
|
||||
func (local *Backend) doImport(ctx context.Context, engine common.Engine, regionRanges []common.Range, regionSplitSize, regionSplitKeys int64) error {
|
||||
/*
|
||||
[prepareAndSendJob]-----jobToWorkerCh--->[workers]
|
||||
^ |
|
||||
@ -1571,8 +1553,8 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
|
||||
}
|
||||
job.waitUntil = time.Now().Add(time.Second * time.Duration(sleepSecond))
|
||||
log.FromContext(ctx).Info("put job back to jobCh to retry later",
|
||||
logutil.Key("startKey", job.keyRange.start),
|
||||
logutil.Key("endKey", job.keyRange.end),
|
||||
logutil.Key("startKey", job.keyRange.Start),
|
||||
logutil.Key("endKey", job.keyRange.End),
|
||||
zap.Stringer("stage", job.stage),
|
||||
zap.Int("retryCount", job.retryCount),
|
||||
zap.Time("waitUntil", job.waitUntil))
|
||||
@ -1741,19 +1723,19 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon
|
||||
// SwitchModeByKeyRanges will switch tikv mode for regions in the specific key range for multirocksdb.
|
||||
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
|
||||
// The return done channel is used to notify the caller that the background goroutine is exited.
|
||||
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []Range) (<-chan struct{}, error) {
|
||||
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error) {
|
||||
switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger)
|
||||
done := make(chan struct{})
|
||||
|
||||
keyRanges := make([]*sst.Range, 0, len(ranges))
|
||||
for _, r := range ranges {
|
||||
startKey := r.start
|
||||
if len(r.start) > 0 {
|
||||
startKey = codec.EncodeBytes(nil, r.start)
|
||||
startKey := r.Start
|
||||
if len(r.Start) > 0 {
|
||||
startKey = codec.EncodeBytes(nil, r.Start)
|
||||
}
|
||||
endKey := r.end
|
||||
if len(r.end) > 0 {
|
||||
endKey = codec.EncodeBytes(nil, r.end)
|
||||
endKey := r.End
|
||||
if len(r.End) > 0 {
|
||||
endKey = codec.EncodeBytes(nil, r.End)
|
||||
}
|
||||
keyRanges = append(keyRanges, &sst.Range{
|
||||
Start: startKey,
|
||||
|
||||
@ -229,27 +229,27 @@ func TestRangeProperties(t *testing.T) {
|
||||
return true
|
||||
})
|
||||
|
||||
fullRange := Range{start: []byte("a"), end: []byte("z")}
|
||||
fullRange := common.Range{Start: []byte("a"), End: []byte("z")}
|
||||
ranges := splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance*5/2)
|
||||
|
||||
require.Equal(t, []Range{
|
||||
{start: []byte("a"), end: []byte("e")},
|
||||
{start: []byte("e"), end: []byte("k")},
|
||||
{start: []byte("k"), end: []byte("mm")},
|
||||
{start: []byte("mm"), end: []byte("q")},
|
||||
{start: []byte("q"), end: []byte("z")},
|
||||
require.Equal(t, []common.Range{
|
||||
{Start: []byte("a"), End: []byte("e")},
|
||||
{Start: []byte("e"), End: []byte("k")},
|
||||
{Start: []byte("k"), End: []byte("mm")},
|
||||
{Start: []byte("mm"), End: []byte("q")},
|
||||
{Start: []byte("q"), End: []byte("z")},
|
||||
}, ranges)
|
||||
|
||||
ranges = splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance)
|
||||
require.Equal(t, []Range{
|
||||
{start: []byte("a"), end: []byte("e")},
|
||||
{start: []byte("e"), end: []byte("h")},
|
||||
{start: []byte("h"), end: []byte("k")},
|
||||
{start: []byte("k"), end: []byte("m")},
|
||||
{start: []byte("m"), end: []byte("mm")},
|
||||
{start: []byte("mm"), end: []byte("n")},
|
||||
{start: []byte("n"), end: []byte("q")},
|
||||
{start: []byte("q"), end: []byte("z")},
|
||||
require.Equal(t, []common.Range{
|
||||
{Start: []byte("a"), End: []byte("e")},
|
||||
{Start: []byte("e"), End: []byte("h")},
|
||||
{Start: []byte("h"), End: []byte("k")},
|
||||
{Start: []byte("k"), End: []byte("m")},
|
||||
{Start: []byte("m"), End: []byte("mm")},
|
||||
{Start: []byte("mm"), End: []byte("n")},
|
||||
{Start: []byte("n"), End: []byte("q")},
|
||||
{Start: []byte("q"), End: []byte("z")},
|
||||
}, ranges)
|
||||
}
|
||||
|
||||
@ -551,10 +551,10 @@ func TestLocalIngestLoop(t *testing.T) {
|
||||
require.Equal(t, atomic.LoadInt32(&maxMetaSeq), f.finishedMetaSeq.Load())
|
||||
}
|
||||
|
||||
func makeRanges(input []string) []Range {
|
||||
ranges := make([]Range, 0, len(input)/2)
|
||||
func makeRanges(input []string) []common.Range {
|
||||
ranges := make([]common.Range, 0, len(input)/2)
|
||||
for i := 0; i < len(input)-1; i += 2 {
|
||||
ranges = append(ranges, Range{start: []byte(input[i]), end: []byte(input[i+1])})
|
||||
ranges = append(ranges, common.Range{Start: []byte(input[i]), End: []byte(input[i+1])})
|
||||
}
|
||||
return ranges
|
||||
}
|
||||
@ -1258,7 +1258,7 @@ func TestCheckPeersBusy(t *testing.T) {
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
|
||||
retryJob := ®ionJob{
|
||||
keyRange: Range{start: []byte("a"), end: []byte("b")},
|
||||
keyRange: common.Range{Start: []byte("a"), End: []byte("b")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 1,
|
||||
@ -1278,7 +1278,7 @@ func TestCheckPeersBusy(t *testing.T) {
|
||||
jobCh <- retryJob
|
||||
|
||||
jobCh <- ®ionJob{
|
||||
keyRange: Range{start: []byte("b"), end: []byte("")},
|
||||
keyRange: common.Range{Start: []byte("b"), End: []byte("")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 4,
|
||||
@ -1330,8 +1330,8 @@ func TestCheckPeersBusy(t *testing.T) {
|
||||
// store 12 has a follower busy, so it will break the workflow for region (11, 12, 13)
|
||||
require.Equal(t, []uint64{11, 12, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"])
|
||||
// region (11, 12, 13) has key range ["a", "b"), it's not finished.
|
||||
require.Equal(t, []byte("a"), retryJob.keyRange.start)
|
||||
require.Equal(t, []byte("b"), retryJob.keyRange.end)
|
||||
require.Equal(t, []byte("a"), retryJob.keyRange.Start)
|
||||
require.Equal(t, []byte("b"), retryJob.keyRange.End)
|
||||
}
|
||||
|
||||
func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
|
||||
@ -1378,7 +1378,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
|
||||
staleJob := ®ionJob{
|
||||
keyRange: Range{start: []byte("a"), end: []byte("")},
|
||||
keyRange: common.Range{Start: []byte("a"), End: []byte("")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 1,
|
||||
@ -1471,7 +1471,7 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) {
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
|
||||
partialWriteJob := ®ionJob{
|
||||
keyRange: Range{start: []byte("a"), end: []byte("c")},
|
||||
keyRange: common.Range{Start: []byte("a"), End: []byte("c")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 1,
|
||||
@ -1578,7 +1578,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
|
||||
partialWriteJob := ®ionJob{
|
||||
keyRange: Range{start: []byte("a"), end: []byte("c")},
|
||||
keyRange: common.Range{Start: []byte("a"), End: []byte("c")},
|
||||
region: &split.RegionInfo{
|
||||
Region: &metapb.Region{
|
||||
Id: 1,
|
||||
@ -1612,8 +1612,8 @@ func TestPartialWriteIngestBusy(t *testing.T) {
|
||||
jobCh <- job
|
||||
case ingested:
|
||||
// partially write will change the start key
|
||||
require.Equal(t, []byte("a2"), job.keyRange.start)
|
||||
require.Equal(t, []byte("c"), job.keyRange.end)
|
||||
require.Equal(t, []byte("a2"), job.keyRange.Start)
|
||||
require.Equal(t, []byte("c"), job.keyRange.End)
|
||||
jobWg.Done()
|
||||
return
|
||||
default:
|
||||
@ -1717,7 +1717,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
bigRegionRange := []Range{{start: []byte{1}, end: []byte{11}}}
|
||||
bigRegionRange := []common.Range{{Start: []byte{1}, End: []byte{11}}}
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
jobWg := sync.WaitGroup{}
|
||||
err := local.generateAndSendJob(
|
||||
@ -1733,8 +1733,8 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
|
||||
require.Len(t, jobCh, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
job := <-jobCh
|
||||
require.Equal(t, []byte{byte(i + 1)}, job.keyRange.start)
|
||||
require.Equal(t, []byte{byte(i + 2)}, job.keyRange.end)
|
||||
require.Equal(t, []byte{byte(i + 1)}, job.keyRange.Start)
|
||||
require.Equal(t, []byte{byte(i + 2)}, job.keyRange.End)
|
||||
jobWg.Done()
|
||||
}
|
||||
jobWg.Wait()
|
||||
@ -1793,10 +1793,10 @@ func TestDoImport(t *testing.T) {
|
||||
// - one job need rescan when ingest
|
||||
// - one job need retry when write
|
||||
|
||||
initRanges := []Range{
|
||||
{start: []byte{'a'}, end: []byte{'b'}},
|
||||
{start: []byte{'b'}, end: []byte{'c'}},
|
||||
{start: []byte{'c'}, end: []byte{'d'}},
|
||||
initRanges := []common.Range{
|
||||
{Start: []byte{'a'}, End: []byte{'b'}},
|
||||
{Start: []byte{'b'}, End: []byte{'c'}},
|
||||
{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
}
|
||||
fakeRegionJobs = map[[2]string]struct {
|
||||
jobs []*regionJob
|
||||
@ -1805,7 +1805,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"a", "b"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'a'}, end: []byte{'b'}},
|
||||
keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1814,7 +1814,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"b", "c"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'b'}, end: []byte{'c'}},
|
||||
keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}},
|
||||
ingestData: &Engine{},
|
||||
injected: []injectedBehaviour{
|
||||
{
|
||||
@ -1848,12 +1848,12 @@ func TestDoImport(t *testing.T) {
|
||||
{"c", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getNeedRescanWhenIngestBehaviour(),
|
||||
},
|
||||
{
|
||||
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
injected: []injectedBehaviour{
|
||||
{
|
||||
@ -1869,7 +1869,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"c", "c2"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1878,7 +1878,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"c2", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1910,7 +1910,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"a", "b"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'a'}, end: []byte{'b'}},
|
||||
keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1932,12 +1932,12 @@ func TestDoImport(t *testing.T) {
|
||||
{"a", "b"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'a'}, end: []byte{'a', '2'}},
|
||||
keyRange: common.Range{Start: []byte{'a'}, End: []byte{'a', '2'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getNeedRescanWhenIngestBehaviour(),
|
||||
},
|
||||
{
|
||||
keyRange: Range{start: []byte{'a', '2'}, end: []byte{'b'}},
|
||||
keyRange: common.Range{Start: []byte{'a', '2'}, End: []byte{'b'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1946,7 +1946,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"b", "c"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'b'}, end: []byte{'c'}},
|
||||
keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1955,7 +1955,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"c", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -1978,7 +1978,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"a", "b"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'a'}, end: []byte{'b'}},
|
||||
keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}},
|
||||
ingestData: &Engine{},
|
||||
retryCount: maxWriteAndIngestRetryTimes - 1,
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
@ -1988,7 +1988,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"b", "c"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'b'}, end: []byte{'c'}},
|
||||
keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}},
|
||||
ingestData: &Engine{},
|
||||
retryCount: maxWriteAndIngestRetryTimes - 1,
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
@ -1998,7 +1998,7 @@ func TestDoImport(t *testing.T) {
|
||||
{"c", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
retryCount: maxWriteAndIngestRetryTimes - 2,
|
||||
injected: []injectedBehaviour{
|
||||
@ -2038,8 +2038,8 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
|
||||
|
||||
// test that job need rescan when ingest
|
||||
|
||||
initRanges := []Range{
|
||||
{start: []byte{'c'}, end: []byte{'d'}},
|
||||
initRanges := []common.Range{
|
||||
{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
}
|
||||
fakeRegionJobs = map[[2]string]struct {
|
||||
jobs []*regionJob
|
||||
@ -2048,13 +2048,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
|
||||
{"c", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getNeedRescanWhenIngestBehaviour(),
|
||||
retryCount: maxWriteAndIngestRetryTimes,
|
||||
},
|
||||
{
|
||||
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
retryCount: maxWriteAndIngestRetryTimes,
|
||||
@ -2064,7 +2064,7 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
|
||||
{"c", "c2"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -2106,9 +2106,9 @@ func TestCtxCancelIsIgnored(t *testing.T) {
|
||||
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
|
||||
})
|
||||
|
||||
initRanges := []Range{
|
||||
{start: []byte{'c'}, end: []byte{'d'}},
|
||||
{start: []byte{'d'}, end: []byte{'e'}},
|
||||
initRanges := []common.Range{
|
||||
{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
{Start: []byte{'d'}, End: []byte{'e'}},
|
||||
}
|
||||
fakeRegionJobs = map[[2]string]struct {
|
||||
jobs []*regionJob
|
||||
@ -2117,7 +2117,7 @@ func TestCtxCancelIsIgnored(t *testing.T) {
|
||||
{"c", "d"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
|
||||
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -2126,7 +2126,7 @@ func TestCtxCancelIsIgnored(t *testing.T) {
|
||||
{"d", "e"}: {
|
||||
jobs: []*regionJob{
|
||||
{
|
||||
keyRange: Range{start: []byte{'d'}, end: []byte{'e'}},
|
||||
keyRange: common.Range{Start: []byte{'d'}, End: []byte{'e'}},
|
||||
ingestData: &Engine{},
|
||||
injected: getSuccessInjectedBehaviour(),
|
||||
},
|
||||
@ -2166,6 +2166,8 @@ func TestExternalEngineLoadIngestData(t *testing.T) {
|
||||
nil,
|
||||
common.DupDetectOpt{},
|
||||
123,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
local := &Backend{
|
||||
BackendConfig: BackendConfig{
|
||||
@ -2175,11 +2177,11 @@ func TestExternalEngineLoadIngestData(t *testing.T) {
|
||||
keys[0], keys[50], endKey,
|
||||
}, nil),
|
||||
}
|
||||
ranges := []Range{
|
||||
{start: keys[0], end: keys[30]},
|
||||
{start: keys[30], end: keys[60]},
|
||||
{start: keys[60], end: keys[90]},
|
||||
{start: keys[90], end: endKey},
|
||||
ranges := []common.Range{
|
||||
{Start: keys[0], End: keys[30]},
|
||||
{Start: keys[30], End: keys[60]},
|
||||
{Start: keys[60], End: keys[90]},
|
||||
{Start: keys[90], End: endKey},
|
||||
}
|
||||
jobToWorkerCh := make(chan *regionJob, 10)
|
||||
jobWg := new(sync.WaitGroup)
|
||||
@ -2199,19 +2201,19 @@ func TestExternalEngineLoadIngestData(t *testing.T) {
|
||||
jobs = append(jobs, <-jobToWorkerCh)
|
||||
}
|
||||
sort.Slice(jobs, func(i, j int) bool {
|
||||
return bytes.Compare(jobs[i].keyRange.start, jobs[j].keyRange.start) < 0
|
||||
return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0
|
||||
})
|
||||
expectedKeyRanges := []Range{
|
||||
{start: keys[0], end: keys[30]},
|
||||
{start: keys[30], end: keys[50]},
|
||||
{start: keys[50], end: keys[60]},
|
||||
{start: keys[60], end: keys[90]},
|
||||
{start: keys[90], end: endKey},
|
||||
expectedKeyRanges := []common.Range{
|
||||
{Start: keys[0], End: keys[30]},
|
||||
{Start: keys[30], End: keys[50]},
|
||||
{Start: keys[50], End: keys[60]},
|
||||
{Start: keys[60], End: keys[90]},
|
||||
{Start: keys[90], End: endKey},
|
||||
}
|
||||
kvIdx := 0
|
||||
for i, job := range jobs {
|
||||
require.Equal(t, expectedKeyRanges[i], job.keyRange)
|
||||
iter := job.ingestData.NewIter(ctx, job.keyRange.start, job.keyRange.end)
|
||||
iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End)
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
require.Equal(t, keys[kvIdx], iter.Key())
|
||||
require.Equal(t, values[kvIdx], iter.Value())
|
||||
|
||||
@ -112,7 +112,7 @@ func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tabl
|
||||
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
|
||||
func (local *Backend) SplitAndScatterRegionInBatches(
|
||||
ctx context.Context,
|
||||
ranges []Range,
|
||||
ranges []common.Range,
|
||||
needSplit bool,
|
||||
batchCnt int,
|
||||
) error {
|
||||
@ -134,7 +134,7 @@ func (local *Backend) SplitAndScatterRegionInBatches(
|
||||
// TODO: remove this file and use br internal functions
|
||||
func (local *Backend) SplitAndScatterRegionByRanges(
|
||||
ctx context.Context,
|
||||
ranges []Range,
|
||||
ranges []common.Range,
|
||||
needSplit bool,
|
||||
) (err error) {
|
||||
if len(ranges) == 0 {
|
||||
@ -150,8 +150,8 @@ func (local *Backend) SplitAndScatterRegionByRanges(
|
||||
}()
|
||||
}
|
||||
|
||||
minKey := codec.EncodeBytes([]byte{}, ranges[0].start)
|
||||
maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end)
|
||||
minKey := codec.EncodeBytes([]byte{}, ranges[0].Start)
|
||||
maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].End)
|
||||
|
||||
scatterRegions := make([]*split.RegionInfo, 0)
|
||||
var retryKeys [][]byte
|
||||
@ -192,12 +192,12 @@ func (local *Backend) SplitAndScatterRegionByRanges(
|
||||
break
|
||||
}
|
||||
|
||||
needSplitRanges := make([]Range, 0, len(ranges))
|
||||
needSplitRanges := make([]common.Range, 0, len(ranges))
|
||||
startKey := make([]byte, 0)
|
||||
endKey := make([]byte, 0)
|
||||
for _, r := range ranges {
|
||||
startKey = codec.EncodeBytes(startKey, r.start)
|
||||
endKey = codec.EncodeBytes(endKey, r.end)
|
||||
startKey = codec.EncodeBytes(startKey, r.Start)
|
||||
endKey = codec.EncodeBytes(endKey, r.End)
|
||||
idx := sort.Search(len(regions), func(i int) bool {
|
||||
return beforeEnd(startKey, regions[i].Region.EndKey)
|
||||
})
|
||||
@ -514,15 +514,15 @@ func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regio
|
||||
}
|
||||
}
|
||||
|
||||
func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte {
|
||||
func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte {
|
||||
checkKeys := make([][]byte, 0)
|
||||
var lastEnd []byte
|
||||
for _, rg := range ranges {
|
||||
if !bytes.Equal(lastEnd, rg.start) {
|
||||
checkKeys = append(checkKeys, rg.start)
|
||||
if !bytes.Equal(lastEnd, rg.Start) {
|
||||
checkKeys = append(checkKeys, rg.Start)
|
||||
}
|
||||
checkKeys = append(checkKeys, rg.end)
|
||||
lastEnd = rg.end
|
||||
checkKeys = append(checkKeys, rg.End)
|
||||
lastEnd = rg.End
|
||||
}
|
||||
return getSplitKeys(checkKeys, regions, logger)
|
||||
}
|
||||
@ -588,22 +588,22 @@ func keyInsideRegion(region *metapb.Region, key []byte) bool {
|
||||
return bytes.Compare(key, region.GetStartKey()) >= 0 && (beforeEnd(key, region.GetEndKey()))
|
||||
}
|
||||
|
||||
func intersectRange(region *metapb.Region, rg Range) Range {
|
||||
func intersectRange(region *metapb.Region, rg common.Range) common.Range {
|
||||
var startKey, endKey []byte
|
||||
if len(region.StartKey) > 0 {
|
||||
_, startKey, _ = codec.DecodeBytes(region.StartKey, []byte{})
|
||||
}
|
||||
if bytes.Compare(startKey, rg.start) < 0 {
|
||||
startKey = rg.start
|
||||
if bytes.Compare(startKey, rg.Start) < 0 {
|
||||
startKey = rg.Start
|
||||
}
|
||||
if len(region.EndKey) > 0 {
|
||||
_, endKey, _ = codec.DecodeBytes(region.EndKey, []byte{})
|
||||
}
|
||||
if beforeEnd(rg.end, endKey) {
|
||||
endKey = rg.end
|
||||
if beforeEnd(rg.End, endKey) {
|
||||
endKey = rg.End
|
||||
}
|
||||
|
||||
return Range{start: startKey, end: endKey}
|
||||
return common.Range{Start: startKey, End: endKey}
|
||||
}
|
||||
|
||||
// StoreWriteLimiter is used to limit the write rate of a store.
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
||||
"github.com/pingcap/tidb/br/pkg/restore/split"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -469,11 +470,11 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie
|
||||
checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")})
|
||||
|
||||
// generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz)
|
||||
ranges := make([]Range, 0)
|
||||
ranges := make([]common.Range, 0)
|
||||
start := []byte{'b'}
|
||||
for i := byte('a'); i <= 'z'; i++ {
|
||||
end := []byte{'b', i}
|
||||
ranges = append(ranges, Range{start: start, end: end})
|
||||
ranges = append(ranges, common.Range{Start: start, End: end})
|
||||
start = end
|
||||
}
|
||||
|
||||
@ -559,11 +560,11 @@ func TestMissingScatter(t *testing.T) {
|
||||
checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")})
|
||||
|
||||
// generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz)
|
||||
ranges := make([]Range, 0)
|
||||
ranges := make([]common.Range, 0)
|
||||
start := []byte{'b'}
|
||||
for i := byte('a'); i <= 'z'; i++ {
|
||||
end := []byte{'b', i}
|
||||
ranges = append(ranges, Range{start: start, end: end})
|
||||
ranges = append(ranges, common.Range{Start: start, End: end})
|
||||
start = end
|
||||
}
|
||||
|
||||
@ -722,11 +723,11 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var ranges []Range
|
||||
var ranges []common.Range
|
||||
for i := 0; i < 20; i++ {
|
||||
ranges = append(ranges, Range{
|
||||
start: []byte(fmt.Sprintf("a%02d", i)),
|
||||
end: []byte(fmt.Sprintf("a%02d", i+1)),
|
||||
ranges = append(ranges, common.Range{
|
||||
Start: []byte(fmt.Sprintf("a%02d", i)),
|
||||
End: []byte(fmt.Sprintf("a%02d", i+1)),
|
||||
})
|
||||
}
|
||||
|
||||
@ -820,9 +821,9 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) {
|
||||
}
|
||||
|
||||
start := rangeKeys[0]
|
||||
ranges := make([]Range, 0, len(rangeKeys)-1)
|
||||
ranges := make([]common.Range, 0, len(rangeKeys)-1)
|
||||
for _, e := range rangeKeys[1:] {
|
||||
ranges = append(ranges, Range{start: start, end: e})
|
||||
ranges = append(ranges, common.Range{Start: start, End: e})
|
||||
start = e
|
||||
}
|
||||
|
||||
|
||||
@ -94,7 +94,7 @@ func (j jobStageTp) String() string {
|
||||
// to a region. The keyRange may be changed when processing because of writing
|
||||
// partial data to TiKV or region split.
|
||||
type regionJob struct {
|
||||
keyRange Range
|
||||
keyRange common.Range
|
||||
// TODO: check the keyRange so that it's always included in region
|
||||
region *split.RegionInfo
|
||||
// stage should be updated only by convertStageTo
|
||||
@ -189,16 +189,16 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
|
||||
begin := time.Now()
|
||||
region := j.region.Region
|
||||
|
||||
firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.start, j.keyRange.end)
|
||||
firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.Start, j.keyRange.End)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if firstKey == nil {
|
||||
j.convertStageTo(ingested)
|
||||
log.FromContext(ctx).Debug("keys within region is empty, skip doIngest",
|
||||
logutil.Key("start", j.keyRange.start),
|
||||
logutil.Key("start", j.keyRange.Start),
|
||||
logutil.Key("regionStart", region.StartKey),
|
||||
logutil.Key("end", j.keyRange.end),
|
||||
logutil.Key("end", j.keyRange.End),
|
||||
logutil.Key("regionEnd", region.EndKey))
|
||||
return nil
|
||||
}
|
||||
@ -298,7 +298,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
iter := j.ingestData.NewIter(ctx, j.keyRange.start, j.keyRange.end)
|
||||
iter := j.ingestData.NewIter(ctx, j.keyRange.Start, j.keyRange.End)
|
||||
//nolint: errcheck
|
||||
defer iter.Close()
|
||||
|
||||
@ -336,8 +336,8 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
|
||||
log.FromContext(ctx).Info("write to tikv partial finish",
|
||||
zap.Int64("count", totalCount),
|
||||
zap.Int64("size", totalSize),
|
||||
logutil.Key("startKey", j.keyRange.start),
|
||||
logutil.Key("endKey", j.keyRange.end),
|
||||
logutil.Key("startKey", j.keyRange.Start),
|
||||
logutil.Key("endKey", j.keyRange.End),
|
||||
logutil.Key("remainStart", remainingStartKey),
|
||||
logutil.Region(region),
|
||||
logutil.Leader(j.region.Leader))
|
||||
@ -464,15 +464,15 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) {
|
||||
zap.Stringer("job stage", j.stage),
|
||||
logutil.ShortError(j.lastRetryableErr),
|
||||
j.region.ToZapFields(),
|
||||
logutil.Key("start", j.keyRange.start),
|
||||
logutil.Key("end", j.keyRange.end))
|
||||
logutil.Key("start", j.keyRange.Start),
|
||||
logutil.Key("end", j.keyRange.End))
|
||||
return nil
|
||||
}
|
||||
log.FromContext(ctx).Warn("meet error and will doIngest region again",
|
||||
logutil.ShortError(j.lastRetryableErr),
|
||||
j.region.ToZapFields(),
|
||||
logutil.Key("start", j.keyRange.start),
|
||||
logutil.Key("end", j.keyRange.end))
|
||||
logutil.Key("start", j.keyRange.Start),
|
||||
logutil.Key("end", j.keyRange.End))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/restore/split"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -56,9 +57,9 @@ func TestIsIngestRetryable(t *testing.T) {
|
||||
}
|
||||
job := regionJob{
|
||||
stage: wrote,
|
||||
keyRange: Range{
|
||||
start: []byte{1},
|
||||
end: []byte{3},
|
||||
keyRange: common.Range{
|
||||
Start: []byte{1},
|
||||
End: []byte{3},
|
||||
},
|
||||
region: region,
|
||||
writeResult: &tikvWriteResult{
|
||||
@ -206,8 +207,8 @@ func TestRegionJobRetryer(t *testing.T) {
|
||||
}
|
||||
|
||||
job := ®ionJob{
|
||||
keyRange: Range{
|
||||
start: []byte("123"),
|
||||
keyRange: common.Range{
|
||||
Start: []byte("123"),
|
||||
},
|
||||
waitUntil: time.Now().Add(-time.Second),
|
||||
}
|
||||
@ -235,8 +236,8 @@ func TestRegionJobRetryer(t *testing.T) {
|
||||
retryer = startRegionJobRetryer(ctx, putBackCh, &jobWg)
|
||||
|
||||
job = ®ionJob{
|
||||
keyRange: Range{
|
||||
start: []byte("123"),
|
||||
keyRange: common.Range{
|
||||
Start: []byte("123"),
|
||||
},
|
||||
waitUntil: time.Now().Add(-time.Second),
|
||||
}
|
||||
@ -246,8 +247,8 @@ func TestRegionJobRetryer(t *testing.T) {
|
||||
time.Sleep(3 * time.Second)
|
||||
// now retryer is sending to putBackCh, but putBackCh is blocked
|
||||
job = ®ionJob{
|
||||
keyRange: Range{
|
||||
start: []byte("456"),
|
||||
keyRange: common.Range{
|
||||
Start: []byte("456"),
|
||||
},
|
||||
waitUntil: time.Now().Add(-time.Second),
|
||||
}
|
||||
|
||||
@ -14,12 +14,30 @@
|
||||
|
||||
package common
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
||||
)
|
||||
|
||||
// Range contains a start key and an end key.
|
||||
type Range struct {
|
||||
Start []byte
|
||||
End []byte // end is always exclusive except import_sstpb.SSTMeta
|
||||
}
|
||||
|
||||
// Engine describes the common interface of local and external engine that
|
||||
// local backend uses.
|
||||
type Engine interface {
|
||||
// ID is the identifier of an engine.
|
||||
ID() string
|
||||
// LoadIngestData returns an IngestData that contains the data in [start, end).
|
||||
LoadIngestData(ctx context.Context, start, end []byte) (IngestData, error)
|
||||
// KVStatistics returns the total kv size and total kv length.
|
||||
KVStatistics() (totalKVSize int64, totalKVLength int64)
|
||||
// ImportedStatistics returns the imported kv size and imported kv length.
|
||||
ImportedStatistics() (importedKVSize int64, importedKVLength int64)
|
||||
// SplitRanges splits the range [startKey, endKey) into multiple ranges.
|
||||
SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]Range, error)
|
||||
// TODO(lance6716): add more methods
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user