diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index ca81011eb7..7f6462117e 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -38,6 +38,7 @@ type Engine struct { storage storage.ExternalStorage dataFiles []string statsFiles []string + splitKeys [][]byte bufPool *membuf.Pool iter *MergeKVIter @@ -48,6 +49,9 @@ type Engine struct { dupDetectOpt common.DupDetectOpt ts uint64 + totalKVSize int64 + totalKVLength int64 + importedKVSize *atomic.Int64 importedKVCount *atomic.Int64 } @@ -62,6 +66,8 @@ func NewExternalEngine( duplicateDB *pebble.DB, dupDetectOpt common.DupDetectOpt, ts uint64, + totalKVSize int64, + totakKVLength int64, ) common.Engine { return &Engine{ storage: storage, @@ -73,6 +79,8 @@ func NewExternalEngine( duplicateDB: duplicateDB, dupDetectOpt: dupDetectOpt, ts: ts, + totalKVSize: totalKVSize, + totalKVLength: totakKVLength, importedKVSize: atomic.NewInt64(0), importedKVCount: atomic.NewInt64(0), } @@ -169,6 +177,42 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte return iter, nil } +// KVStatistics returns the total kv size and total kv length. +func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) { + return e.totalKVSize, e.totalKVLength +} + +// ImportedStatistics returns the imported kv size and imported kv length. +func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) { + return e.importedKVSize.Load(), e.importedKVCount.Load() +} + +// ID is the identifier of an engine. +func (e *Engine) ID() string { + return "external" +} + +// SplitRanges split the ranges by split keys provided by external engine. +func (e *Engine) SplitRanges( + startKey, endKey []byte, + _, _ int64, + _ log.Logger, +) ([]common.Range, error) { + splitKeys := e.splitKeys + ranges := make([]common.Range, 0, len(splitKeys)+1) + ranges = append(ranges, common.Range{Start: startKey}) + for i := 0; i < len(splitKeys); i++ { + ranges[len(ranges)-1].End = splitKeys[i] + var endK []byte + if i < len(splitKeys)-1 { + endK = splitKeys[i+1] + } + ranges = append(ranges, common.Range{Start: splitKeys[i], End: endK}) + } + ranges[len(ranges)-1].End = endKey + return ranges, nil +} + // Close releases the resources of the engine. func (e *Engine) Close() error { if e.iter == nil { diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 4fe01f940c..c99ff5d228 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -673,13 +673,13 @@ func (m *DupeDetector) splitLocalDupTaskByKeys( if err != nil { return nil, errors.Trace(err) } - ranges := splitRangeBySizeProps(Range{start: task.StartKey, end: task.EndKey}, sizeProps, sizeLimit, keysLimit) + ranges := splitRangeBySizeProps(common.Range{Start: task.StartKey, End: task.EndKey}, sizeProps, sizeLimit, keysLimit) newDupTasks := make([]dupTask, 0, len(ranges)) for _, r := range ranges { newDupTasks = append(newDupTasks, dupTask{ KeyRange: tidbkv.KeyRange{ - StartKey: r.start, - EndKey: r.end, + StartKey: r.Start, + EndKey: r.End, }, tableID: task.tableID, indexInfo: task.indexInfo, diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 7e1b6ee354..3e857f76bd 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -87,10 +87,10 @@ type engineMeta struct { type syncedRanges struct { sync.Mutex - ranges []Range + ranges []common.Range } -func (r *syncedRanges) add(g Range) { +func (r *syncedRanges) add(g common.Range) { r.Lock() r.ranges = append(r.ranges, g) r.Unlock() @@ -275,6 +275,41 @@ func (e *Engine) TotalMemorySize() int64 { return memSize } +// KVStatistics returns the total kv size and total kv length. +func (e *Engine) KVStatistics() (totalKVSize int64, totalKVLength int64) { + return e.TotalSize.Load(), e.Length.Load() +} + +// ImportedStatistics returns the imported kv size and imported kv length. +func (e *Engine) ImportedStatistics() (importedKVSize int64, importedKVLength int64) { + return e.importedKVSize.Load(), e.importedKVCount.Load() +} + +// ID is the identifier of an engine. +func (e *Engine) ID() string { + return e.UUID.String() +} + +// SplitRanges gets size properties from pebble and split ranges according to size/keys limit. +func (e *Engine) SplitRanges( + startKey, endKey []byte, + sizeLimit, keysLimit int64, + logger log.Logger, +) ([]common.Range, error) { + sizeProps, err := getSizePropertiesFn(logger, e.getDB(), e.keyAdapter) + if err != nil { + return nil, errors.Trace(err) + } + + ranges := splitRangeBySizeProps( + common.Range{Start: startKey, End: endKey}, + sizeProps, + sizeLimit, + keysLimit, + ) + return ranges, nil +} + type rangeOffsets struct { Size uint64 Keys uint64 diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6a28cba4c9..9474273f31 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -233,13 +233,6 @@ func (c loggingConn) Write(b []byte) (int, error) { return c.Conn.Write(b) } -// Range record start and end key for localStoreDir.DB -// so we can write it to tikv in streaming -type Range struct { - start []byte - end []byte // end is always exclusive except import_sstpb.SSTMeta -} - type encodingBuilder struct { metrics *metric.Metrics } @@ -985,23 +978,23 @@ func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst. return local.importClientFactory.Create(ctx, storeID) } -func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { - ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit)) +func splitRangeBySizeProps(fullRange common.Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []common.Range { + ranges := make([]common.Range, 0, sizeProps.totalSize/uint64(sizeLimit)) curSize := uint64(0) curKeys := uint64(0) - curKey := fullRange.start + curKey := fullRange.Start sizeProps.iter(func(p *rangeProperty) bool { if bytes.Compare(p.Key, curKey) <= 0 { return true } - if bytes.Compare(p.Key, fullRange.end) > 0 { + if bytes.Compare(p.Key, fullRange.End) > 0 { return false } curSize += p.Size curKeys += p.Keys if int64(curSize) >= sizeLimit || int64(curKeys) >= keysLimit { - ranges = append(ranges, Range{start: curKey, end: p.Key}) + ranges = append(ranges, common.Range{Start: curKey, End: p.Key}) curKey = p.Key curSize = 0 curKeys = 0 @@ -1009,23 +1002,23 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit return true }) - if bytes.Compare(curKey, fullRange.end) < 0 { + if bytes.Compare(curKey, fullRange.End) < 0 { // If the remaining range is too small, append it to last range. if len(ranges) > 0 && curKeys == 0 { - ranges[len(ranges)-1].end = fullRange.end + ranges[len(ranges)-1].End = fullRange.End } else { - ranges = append(ranges, Range{start: curKey, end: fullRange.end}) + ranges = append(ranges, common.Range{Start: curKey, End: fullRange.End}) } } return ranges } -func (local *Backend) readAndSplitIntoRange( +func readAndSplitIntoRange( ctx context.Context, engine *Engine, sizeLimit int64, keysLimit int64, -) ([]Range, error) { +) ([]common.Range, error) { firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil) if err != nil { return nil, err @@ -1036,29 +1029,20 @@ func (local *Backend) readAndSplitIntoRange( endKey := nextKey(lastKey) - engineFileTotalSize := engine.TotalSize.Load() - engineFileLength := engine.Length.Load() + engineFileTotalSize, engineFileLength := engine.KVStatistics() if engineFileTotalSize <= sizeLimit && engineFileLength <= keysLimit { - ranges := []Range{{start: firstKey, end: endKey}} + ranges := []common.Range{{Start: firstKey, End: endKey}} return ranges, nil } - logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID)) - sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) - if err != nil { - return nil, errors.Trace(err) - } - - ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps, - sizeLimit, keysLimit) - - logger.Info("split engine key ranges", + logger := log.FromContext(ctx).With(zap.String("engine", engine.ID())) + ranges, err := engine.SplitRanges(firstKey, endKey, sizeLimit, keysLimit, logger) + logger.Info("split local engine key ranges", zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength), logutil.Key("firstKey", firstKey), logutil.Key("lastKey", lastKey), - zap.Int("ranges", len(ranges))) - - return ranges, nil + zap.Int("ranges", len(ranges)), zap.Error(err)) + return ranges, err } // prepareAndSendJob will read the engine to get estimated key range, @@ -1069,14 +1053,13 @@ func (local *Backend) readAndSplitIntoRange( // seize the "first" error. func (local *Backend) prepareAndSendJob( ctx context.Context, - engine *Engine, - initialSplitRanges []Range, + engine common.Engine, + initialSplitRanges []common.Range, regionSplitSize, regionSplitKeys int64, jobToWorkerCh chan<- *regionJob, jobWg *sync.WaitGroup, ) error { - lfTotalSize := engine.TotalSize.Load() - lfLength := engine.Length.Load() + lfTotalSize, lfLength := engine.KVStatistics() log.FromContext(ctx).Info("import engine ranges", zap.Int("count", len(initialSplitRanges))) if len(initialSplitRanges) == 0 { return nil @@ -1090,7 +1073,7 @@ func (local *Backend) prepareAndSendJob( failpoint.Inject("failToSplit", func(_ failpoint.Value) { needSplit = true }) - logger := log.FromContext(ctx).With(zap.Stringer("uuid", engine.UUID)).Begin(zap.InfoLevel, "split and scatter ranges") + logger := log.FromContext(ctx).With(zap.String("uuid", engine.ID())).Begin(zap.InfoLevel, "split and scatter ranges") for i := 0; i < maxRetryTimes; i++ { failpoint.Inject("skipSplitAndScatter", func() { failpoint.Break() @@ -1101,7 +1084,7 @@ func (local *Backend) prepareAndSendJob( break } - log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engine.UUID), + log.FromContext(ctx).Warn("split and scatter failed in retry", zap.String("engine ID", engine.ID()), log.ShortError(err), zap.Int("retry", i)) } logger.End(zap.ErrorLevel, err) @@ -1124,7 +1107,7 @@ func (local *Backend) prepareAndSendJob( func (local *Backend) generateAndSendJob( ctx context.Context, engine common.Engine, - jobRanges []Range, + jobRanges []common.Range, regionSplitSize, regionSplitKeys int64, jobToWorkerCh chan<- *regionJob, jobWg *sync.WaitGroup, @@ -1136,16 +1119,15 @@ func (local *Backend) generateAndSendJob( // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. if regionSplitSize > 2*int64(config.SplitRegionSize) && ok { - sizeProps, err := getSizePropertiesFn(logger, localEngine.getDB(), local.keyAdapter) + start := jobRanges[0].Start + end := jobRanges[len(jobRanges)-1].End + sizeLimit := int64(config.SplitRegionSize) + keysLimit := int64(config.SplitRegionKeys) + jrs, err := localEngine.SplitRanges(start, end, sizeLimit, keysLimit, logger) if err != nil { return errors.Trace(err) } - - jobRanges = splitRangeBySizeProps( - Range{start: jobRanges[0].start, end: jobRanges[len(jobRanges)-1].end}, - sizeProps, - int64(config.SplitRegionSize), - int64(config.SplitRegionKeys)) + jobRanges = jrs } logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) @@ -1155,7 +1137,7 @@ func (local *Backend) generateAndSendJob( eg.SetLimit(local.WorkerConcurrency) for _, jobRange := range jobRanges { r := jobRange - data, err := engine.LoadIngestData(ctx, r.start, r.end) + data, err := engine.LoadIngestData(ctx, r.Start, r.End) if err != nil { cancel() err2 := eg.Wait() @@ -1207,14 +1189,14 @@ var fakeRegionJobs map[[2]string]struct { func (local *Backend) generateJobForRange( ctx context.Context, data common.IngestData, - keyRange Range, + keyRange common.Range, regionSplitSize, regionSplitKeys int64, ) ([]*regionJob, error) { failpoint.Inject("fakeRegionJobs", func() { if ctx.Err() != nil { failpoint.Return(nil, ctx.Err()) } - key := [2]string{string(keyRange.start), string(keyRange.end)} + key := [2]string{string(keyRange.Start), string(keyRange.End)} injected := fakeRegionJobs[key] // overwrite the stage to regionScanned, because some time same keyRange // will be generated more than once. @@ -1224,7 +1206,7 @@ func (local *Backend) generateJobForRange( failpoint.Return(injected.jobs, injected.err) }) - start, end := keyRange.start, keyRange.end + start, end := keyRange.Start, keyRange.End pairStart, pairEnd, err := data.GetFirstAndLastKey(start, end) if err != nil { return nil, err @@ -1259,7 +1241,7 @@ func (local *Backend) generateJobForRange( zap.Reflect("peers", region.Region.GetPeers())) jobs = append(jobs, ®ionJob{ - keyRange: intersectRange(region.Region, Range{start: start, end: end}), + keyRange: intersectRange(region.Region, common.Range{Start: start, End: end}), region: region, stage: regionScanned, ingestData: data, @@ -1411,7 +1393,7 @@ func (local *Backend) executeJob( if job.writeResult == nil || job.writeResult.remainingStartKey == nil { return nil } - job.keyRange.start = job.writeResult.remainingStartKey + job.keyRange.Start = job.writeResult.remainingStartKey job.convertStageTo(regionScanned) } } @@ -1425,8 +1407,7 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re } defer lf.unlock() - lfTotalSize := lf.TotalSize.Load() - lfLength := lf.Length.Load() + lfTotalSize, lfLength := lf.KVStatistics() if lfTotalSize == 0 { // engine is empty, this is likes because it's a index engine but the table contains no index log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) @@ -1445,7 +1426,7 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re } // split sorted file into range about regionSplitSize per file - regionRanges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys) + regionRanges, err := readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys) if err != nil { return err } @@ -1456,11 +1437,11 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re defer cancel() var startKey, endKey []byte - if len(regionRanges[0].start) > 0 { - startKey = codec.EncodeBytes(nil, regionRanges[0].start) + if len(regionRanges[0].Start) > 0 { + startKey = codec.EncodeBytes(nil, regionRanges[0].Start) } - if len(regionRanges[len(regionRanges)-1].end) > 0 { - endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].end) + if len(regionRanges[len(regionRanges)-1].End) > 0 { + endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].End) } done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) if err != nil { @@ -1474,8 +1455,8 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re if len(regionRanges) > 0 && local.BackendConfig.RaftKV2SwitchModeDuration > 0 { log.FromContext(ctx).Info("switch import mode of ranges", - zap.String("startKey", hex.EncodeToString(regionRanges[0].start)), - zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].end))) + zap.String("startKey", hex.EncodeToString(regionRanges[0].Start)), + zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].End))) subCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -1499,17 +1480,18 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re err = local.doImport(ctx, lf, regionRanges, regionSplitSize, regionSplitKeys) if err == nil { + importedSize, importedLength := lf.ImportedStatistics() log.FromContext(ctx).Info("import engine success", zap.Stringer("uuid", engineUUID), zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength), - zap.Int64("importedSize", lf.importedKVSize.Load()), - zap.Int64("importedCount", lf.importedKVCount.Load())) + zap.Int64("importedSize", importedSize), + zap.Int64("importedCount", importedLength)) } return err } -func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges []Range, regionSplitSize, regionSplitKeys int64) error { +func (local *Backend) doImport(ctx context.Context, engine common.Engine, regionRanges []common.Range, regionSplitSize, regionSplitKeys int64) error { /* [prepareAndSendJob]-----jobToWorkerCh--->[workers] ^ | @@ -1571,8 +1553,8 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges } job.waitUntil = time.Now().Add(time.Second * time.Duration(sleepSecond)) log.FromContext(ctx).Info("put job back to jobCh to retry later", - logutil.Key("startKey", job.keyRange.start), - logutil.Key("endKey", job.keyRange.end), + logutil.Key("startKey", job.keyRange.Start), + logutil.Key("endKey", job.keyRange.End), zap.Stringer("stage", job.stage), zap.Int("retryCount", job.retryCount), zap.Time("waitUntil", job.waitUntil)) @@ -1741,19 +1723,19 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon // SwitchModeByKeyRanges will switch tikv mode for regions in the specific key range for multirocksdb. // This function will spawn a goroutine to keep switch mode periodically until the context is done. // The return done channel is used to notify the caller that the background goroutine is exited. -func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []Range) (<-chan struct{}, error) { +func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error) { switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger) done := make(chan struct{}) keyRanges := make([]*sst.Range, 0, len(ranges)) for _, r := range ranges { - startKey := r.start - if len(r.start) > 0 { - startKey = codec.EncodeBytes(nil, r.start) + startKey := r.Start + if len(r.Start) > 0 { + startKey = codec.EncodeBytes(nil, r.Start) } - endKey := r.end - if len(r.end) > 0 { - endKey = codec.EncodeBytes(nil, r.end) + endKey := r.End + if len(r.End) > 0 { + endKey = codec.EncodeBytes(nil, r.End) } keyRanges = append(keyRanges, &sst.Range{ Start: startKey, diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index be34478736..88c8e7e95e 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -229,27 +229,27 @@ func TestRangeProperties(t *testing.T) { return true }) - fullRange := Range{start: []byte("a"), end: []byte("z")} + fullRange := common.Range{Start: []byte("a"), End: []byte("z")} ranges := splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance*5/2) - require.Equal(t, []Range{ - {start: []byte("a"), end: []byte("e")}, - {start: []byte("e"), end: []byte("k")}, - {start: []byte("k"), end: []byte("mm")}, - {start: []byte("mm"), end: []byte("q")}, - {start: []byte("q"), end: []byte("z")}, + require.Equal(t, []common.Range{ + {Start: []byte("a"), End: []byte("e")}, + {Start: []byte("e"), End: []byte("k")}, + {Start: []byte("k"), End: []byte("mm")}, + {Start: []byte("mm"), End: []byte("q")}, + {Start: []byte("q"), End: []byte("z")}, }, ranges) ranges = splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance) - require.Equal(t, []Range{ - {start: []byte("a"), end: []byte("e")}, - {start: []byte("e"), end: []byte("h")}, - {start: []byte("h"), end: []byte("k")}, - {start: []byte("k"), end: []byte("m")}, - {start: []byte("m"), end: []byte("mm")}, - {start: []byte("mm"), end: []byte("n")}, - {start: []byte("n"), end: []byte("q")}, - {start: []byte("q"), end: []byte("z")}, + require.Equal(t, []common.Range{ + {Start: []byte("a"), End: []byte("e")}, + {Start: []byte("e"), End: []byte("h")}, + {Start: []byte("h"), End: []byte("k")}, + {Start: []byte("k"), End: []byte("m")}, + {Start: []byte("m"), End: []byte("mm")}, + {Start: []byte("mm"), End: []byte("n")}, + {Start: []byte("n"), End: []byte("q")}, + {Start: []byte("q"), End: []byte("z")}, }, ranges) } @@ -551,10 +551,10 @@ func TestLocalIngestLoop(t *testing.T) { require.Equal(t, atomic.LoadInt32(&maxMetaSeq), f.finishedMetaSeq.Load()) } -func makeRanges(input []string) []Range { - ranges := make([]Range, 0, len(input)/2) +func makeRanges(input []string) []common.Range { + ranges := make([]common.Range, 0, len(input)/2) for i := 0; i < len(input)-1; i += 2 { - ranges = append(ranges, Range{start: []byte(input[i]), end: []byte(input[i+1])}) + ranges = append(ranges, common.Range{Start: []byte(input[i]), End: []byte(input[i+1])}) } return ranges } @@ -1258,7 +1258,7 @@ func TestCheckPeersBusy(t *testing.T) { jobCh := make(chan *regionJob, 10) retryJob := ®ionJob{ - keyRange: Range{start: []byte("a"), end: []byte("b")}, + keyRange: common.Range{Start: []byte("a"), End: []byte("b")}, region: &split.RegionInfo{ Region: &metapb.Region{ Id: 1, @@ -1278,7 +1278,7 @@ func TestCheckPeersBusy(t *testing.T) { jobCh <- retryJob jobCh <- ®ionJob{ - keyRange: Range{start: []byte("b"), end: []byte("")}, + keyRange: common.Range{Start: []byte("b"), End: []byte("")}, region: &split.RegionInfo{ Region: &metapb.Region{ Id: 4, @@ -1330,8 +1330,8 @@ func TestCheckPeersBusy(t *testing.T) { // store 12 has a follower busy, so it will break the workflow for region (11, 12, 13) require.Equal(t, []uint64{11, 12, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) // region (11, 12, 13) has key range ["a", "b"), it's not finished. - require.Equal(t, []byte("a"), retryJob.keyRange.start) - require.Equal(t, []byte("b"), retryJob.keyRange.end) + require.Equal(t, []byte("a"), retryJob.keyRange.Start) + require.Equal(t, []byte("b"), retryJob.keyRange.End) } func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { @@ -1378,7 +1378,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { jobCh := make(chan *regionJob, 10) staleJob := ®ionJob{ - keyRange: Range{start: []byte("a"), end: []byte("")}, + keyRange: common.Range{Start: []byte("a"), End: []byte("")}, region: &split.RegionInfo{ Region: &metapb.Region{ Id: 1, @@ -1471,7 +1471,7 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) { jobCh := make(chan *regionJob, 10) partialWriteJob := ®ionJob{ - keyRange: Range{start: []byte("a"), end: []byte("c")}, + keyRange: common.Range{Start: []byte("a"), End: []byte("c")}, region: &split.RegionInfo{ Region: &metapb.Region{ Id: 1, @@ -1578,7 +1578,7 @@ func TestPartialWriteIngestBusy(t *testing.T) { jobCh := make(chan *regionJob, 10) partialWriteJob := ®ionJob{ - keyRange: Range{start: []byte("a"), end: []byte("c")}, + keyRange: common.Range{Start: []byte("a"), End: []byte("c")}, region: &split.RegionInfo{ Region: &metapb.Region{ Id: 1, @@ -1612,8 +1612,8 @@ func TestPartialWriteIngestBusy(t *testing.T) { jobCh <- job case ingested: // partially write will change the start key - require.Equal(t, []byte("a2"), job.keyRange.start) - require.Equal(t, []byte("c"), job.keyRange.end) + require.Equal(t, []byte("a2"), job.keyRange.Start) + require.Equal(t, []byte("c"), job.keyRange.End) jobWg.Done() return default: @@ -1717,7 +1717,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { require.NoError(t, err) } - bigRegionRange := []Range{{start: []byte{1}, end: []byte{11}}} + bigRegionRange := []common.Range{{Start: []byte{1}, End: []byte{11}}} jobCh := make(chan *regionJob, 10) jobWg := sync.WaitGroup{} err := local.generateAndSendJob( @@ -1733,8 +1733,8 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { require.Len(t, jobCh, 10) for i := 0; i < 10; i++ { job := <-jobCh - require.Equal(t, []byte{byte(i + 1)}, job.keyRange.start) - require.Equal(t, []byte{byte(i + 2)}, job.keyRange.end) + require.Equal(t, []byte{byte(i + 1)}, job.keyRange.Start) + require.Equal(t, []byte{byte(i + 2)}, job.keyRange.End) jobWg.Done() } jobWg.Wait() @@ -1793,10 +1793,10 @@ func TestDoImport(t *testing.T) { // - one job need rescan when ingest // - one job need retry when write - initRanges := []Range{ - {start: []byte{'a'}, end: []byte{'b'}}, - {start: []byte{'b'}, end: []byte{'c'}}, - {start: []byte{'c'}, end: []byte{'d'}}, + initRanges := []common.Range{ + {Start: []byte{'a'}, End: []byte{'b'}}, + {Start: []byte{'b'}, End: []byte{'c'}}, + {Start: []byte{'c'}, End: []byte{'d'}}, } fakeRegionJobs = map[[2]string]struct { jobs []*regionJob @@ -1805,7 +1805,7 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1814,7 +1814,7 @@ func TestDoImport(t *testing.T) { {"b", "c"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}}, ingestData: &Engine{}, injected: []injectedBehaviour{ { @@ -1848,12 +1848,12 @@ func TestDoImport(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}}, ingestData: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), }, { - keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: []injectedBehaviour{ { @@ -1869,7 +1869,7 @@ func TestDoImport(t *testing.T) { {"c", "c2"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1878,7 +1878,7 @@ func TestDoImport(t *testing.T) { {"c2", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1910,7 +1910,7 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1932,12 +1932,12 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'a', '2'}}, + keyRange: common.Range{Start: []byte{'a'}, End: []byte{'a', '2'}}, ingestData: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), }, { - keyRange: Range{start: []byte{'a', '2'}, end: []byte{'b'}}, + keyRange: common.Range{Start: []byte{'a', '2'}, End: []byte{'b'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1946,7 +1946,7 @@ func TestDoImport(t *testing.T) { {"b", "c"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1955,7 +1955,7 @@ func TestDoImport(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -1978,7 +1978,7 @@ func TestDoImport(t *testing.T) { {"a", "b"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, + keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}}, ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), @@ -1988,7 +1988,7 @@ func TestDoImport(t *testing.T) { {"b", "c"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, + keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}}, ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), @@ -1998,7 +1998,7 @@ func TestDoImport(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}}, ingestData: &Engine{}, retryCount: maxWriteAndIngestRetryTimes - 2, injected: []injectedBehaviour{ @@ -2038,8 +2038,8 @@ func TestRegionJobResetRetryCounter(t *testing.T) { // test that job need rescan when ingest - initRanges := []Range{ - {start: []byte{'c'}, end: []byte{'d'}}, + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, } fakeRegionJobs = map[[2]string]struct { jobs []*regionJob @@ -2048,13 +2048,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}}, ingestData: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), retryCount: maxWriteAndIngestRetryTimes, }, { - keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), retryCount: maxWriteAndIngestRetryTimes, @@ -2064,7 +2064,7 @@ func TestRegionJobResetRetryCounter(t *testing.T) { {"c", "c2"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -2106,9 +2106,9 @@ func TestCtxCancelIsIgnored(t *testing.T) { _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") }) - initRanges := []Range{ - {start: []byte{'c'}, end: []byte{'d'}}, - {start: []byte{'d'}, end: []byte{'e'}}, + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, + {Start: []byte{'d'}, End: []byte{'e'}}, } fakeRegionJobs = map[[2]string]struct { jobs []*regionJob @@ -2117,7 +2117,7 @@ func TestCtxCancelIsIgnored(t *testing.T) { {"c", "d"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -2126,7 +2126,7 @@ func TestCtxCancelIsIgnored(t *testing.T) { {"d", "e"}: { jobs: []*regionJob{ { - keyRange: Range{start: []byte{'d'}, end: []byte{'e'}}, + keyRange: common.Range{Start: []byte{'d'}, End: []byte{'e'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), }, @@ -2166,6 +2166,8 @@ func TestExternalEngineLoadIngestData(t *testing.T) { nil, common.DupDetectOpt{}, 123, + 0, + 0, ) local := &Backend{ BackendConfig: BackendConfig{ @@ -2175,11 +2177,11 @@ func TestExternalEngineLoadIngestData(t *testing.T) { keys[0], keys[50], endKey, }, nil), } - ranges := []Range{ - {start: keys[0], end: keys[30]}, - {start: keys[30], end: keys[60]}, - {start: keys[60], end: keys[90]}, - {start: keys[90], end: endKey}, + ranges := []common.Range{ + {Start: keys[0], End: keys[30]}, + {Start: keys[30], End: keys[60]}, + {Start: keys[60], End: keys[90]}, + {Start: keys[90], End: endKey}, } jobToWorkerCh := make(chan *regionJob, 10) jobWg := new(sync.WaitGroup) @@ -2199,19 +2201,19 @@ func TestExternalEngineLoadIngestData(t *testing.T) { jobs = append(jobs, <-jobToWorkerCh) } sort.Slice(jobs, func(i, j int) bool { - return bytes.Compare(jobs[i].keyRange.start, jobs[j].keyRange.start) < 0 + return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0 }) - expectedKeyRanges := []Range{ - {start: keys[0], end: keys[30]}, - {start: keys[30], end: keys[50]}, - {start: keys[50], end: keys[60]}, - {start: keys[60], end: keys[90]}, - {start: keys[90], end: endKey}, + expectedKeyRanges := []common.Range{ + {Start: keys[0], End: keys[30]}, + {Start: keys[30], End: keys[50]}, + {Start: keys[50], End: keys[60]}, + {Start: keys[60], End: keys[90]}, + {Start: keys[90], End: endKey}, } kvIdx := 0 for i, job := range jobs { require.Equal(t, expectedKeyRanges[i], job.keyRange) - iter := job.ingestData.NewIter(ctx, job.keyRange.start, job.keyRange.end) + iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End) for iter.First(); iter.Valid(); iter.Next() { require.Equal(t, keys[kvIdx], iter.Key()) require.Equal(t, values[kvIdx], iter.Value()) diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index ac456b7df9..ea659483ec 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -112,7 +112,7 @@ func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tabl // Too many split&scatter requests may put a lot of pressure on TiKV and PD. func (local *Backend) SplitAndScatterRegionInBatches( ctx context.Context, - ranges []Range, + ranges []common.Range, needSplit bool, batchCnt int, ) error { @@ -134,7 +134,7 @@ func (local *Backend) SplitAndScatterRegionInBatches( // TODO: remove this file and use br internal functions func (local *Backend) SplitAndScatterRegionByRanges( ctx context.Context, - ranges []Range, + ranges []common.Range, needSplit bool, ) (err error) { if len(ranges) == 0 { @@ -150,8 +150,8 @@ func (local *Backend) SplitAndScatterRegionByRanges( }() } - minKey := codec.EncodeBytes([]byte{}, ranges[0].start) - maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end) + minKey := codec.EncodeBytes([]byte{}, ranges[0].Start) + maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].End) scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte @@ -192,12 +192,12 @@ func (local *Backend) SplitAndScatterRegionByRanges( break } - needSplitRanges := make([]Range, 0, len(ranges)) + needSplitRanges := make([]common.Range, 0, len(ranges)) startKey := make([]byte, 0) endKey := make([]byte, 0) for _, r := range ranges { - startKey = codec.EncodeBytes(startKey, r.start) - endKey = codec.EncodeBytes(endKey, r.end) + startKey = codec.EncodeBytes(startKey, r.Start) + endKey = codec.EncodeBytes(endKey, r.End) idx := sort.Search(len(regions), func(i int) bool { return beforeEnd(startKey, regions[i].Region.EndKey) }) @@ -514,15 +514,15 @@ func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regio } } -func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte { +func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte { checkKeys := make([][]byte, 0) var lastEnd []byte for _, rg := range ranges { - if !bytes.Equal(lastEnd, rg.start) { - checkKeys = append(checkKeys, rg.start) + if !bytes.Equal(lastEnd, rg.Start) { + checkKeys = append(checkKeys, rg.Start) } - checkKeys = append(checkKeys, rg.end) - lastEnd = rg.end + checkKeys = append(checkKeys, rg.End) + lastEnd = rg.End } return getSplitKeys(checkKeys, regions, logger) } @@ -588,22 +588,22 @@ func keyInsideRegion(region *metapb.Region, key []byte) bool { return bytes.Compare(key, region.GetStartKey()) >= 0 && (beforeEnd(key, region.GetEndKey())) } -func intersectRange(region *metapb.Region, rg Range) Range { +func intersectRange(region *metapb.Region, rg common.Range) common.Range { var startKey, endKey []byte if len(region.StartKey) > 0 { _, startKey, _ = codec.DecodeBytes(region.StartKey, []byte{}) } - if bytes.Compare(startKey, rg.start) < 0 { - startKey = rg.start + if bytes.Compare(startKey, rg.Start) < 0 { + startKey = rg.Start } if len(region.EndKey) > 0 { _, endKey, _ = codec.DecodeBytes(region.EndKey, []byte{}) } - if beforeEnd(rg.end, endKey) { - endKey = rg.end + if beforeEnd(rg.End, endKey) { + endKey = rg.End } - return Range{start: startKey, end: endKey} + return common.Range{Start: startKey, End: endKey} } // StoreWriteLimiter is used to limit the write rate of a store. diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 2554e4d83d..72dc3184f7 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/kv" @@ -469,11 +470,11 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")}) // generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz) - ranges := make([]Range, 0) + ranges := make([]common.Range, 0) start := []byte{'b'} for i := byte('a'); i <= 'z'; i++ { end := []byte{'b', i} - ranges = append(ranges, Range{start: start, end: end}) + ranges = append(ranges, common.Range{Start: start, End: end}) start = end } @@ -559,11 +560,11 @@ func TestMissingScatter(t *testing.T) { checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")}) // generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz) - ranges := make([]Range, 0) + ranges := make([]common.Range, 0) start := []byte{'b'} for i := byte('a'); i <= 'z'; i++ { end := []byte{'b', i} - ranges = append(ranges, Range{start: start, end: end}) + ranges = append(ranges, common.Range{Start: start, End: end}) start = end } @@ -722,11 +723,11 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var ranges []Range + var ranges []common.Range for i := 0; i < 20; i++ { - ranges = append(ranges, Range{ - start: []byte(fmt.Sprintf("a%02d", i)), - end: []byte(fmt.Sprintf("a%02d", i+1)), + ranges = append(ranges, common.Range{ + Start: []byte(fmt.Sprintf("a%02d", i)), + End: []byte(fmt.Sprintf("a%02d", i+1)), }) } @@ -820,9 +821,9 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { } start := rangeKeys[0] - ranges := make([]Range, 0, len(rangeKeys)-1) + ranges := make([]common.Range, 0, len(rangeKeys)-1) for _, e := range rangeKeys[1:] { - ranges = append(ranges, Range{start: start, end: e}) + ranges = append(ranges, common.Range{Start: start, End: e}) start = e } diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index f986bd3188..3a38e91895 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -94,7 +94,7 @@ func (j jobStageTp) String() string { // to a region. The keyRange may be changed when processing because of writing // partial data to TiKV or region split. type regionJob struct { - keyRange Range + keyRange common.Range // TODO: check the keyRange so that it's always included in region region *split.RegionInfo // stage should be updated only by convertStageTo @@ -189,16 +189,16 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { begin := time.Now() region := j.region.Region - firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.start, j.keyRange.end) + firstKey, lastKey, err := j.ingestData.GetFirstAndLastKey(j.keyRange.Start, j.keyRange.End) if err != nil { return errors.Trace(err) } if firstKey == nil { j.convertStageTo(ingested) log.FromContext(ctx).Debug("keys within region is empty, skip doIngest", - logutil.Key("start", j.keyRange.start), + logutil.Key("start", j.keyRange.Start), logutil.Key("regionStart", region.StartKey), - logutil.Key("end", j.keyRange.end), + logutil.Key("end", j.keyRange.End), logutil.Key("regionEnd", region.EndKey)) return nil } @@ -298,7 +298,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { return nil } - iter := j.ingestData.NewIter(ctx, j.keyRange.start, j.keyRange.end) + iter := j.ingestData.NewIter(ctx, j.keyRange.Start, j.keyRange.End) //nolint: errcheck defer iter.Close() @@ -336,8 +336,8 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { log.FromContext(ctx).Info("write to tikv partial finish", zap.Int64("count", totalCount), zap.Int64("size", totalSize), - logutil.Key("startKey", j.keyRange.start), - logutil.Key("endKey", j.keyRange.end), + logutil.Key("startKey", j.keyRange.Start), + logutil.Key("endKey", j.keyRange.End), logutil.Key("remainStart", remainingStartKey), logutil.Region(region), logutil.Leader(j.region.Leader)) @@ -464,15 +464,15 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) { zap.Stringer("job stage", j.stage), logutil.ShortError(j.lastRetryableErr), j.region.ToZapFields(), - logutil.Key("start", j.keyRange.start), - logutil.Key("end", j.keyRange.end)) + logutil.Key("start", j.keyRange.Start), + logutil.Key("end", j.keyRange.End)) return nil } log.FromContext(ctx).Warn("meet error and will doIngest region again", logutil.ShortError(j.lastRetryableErr), j.region.ToZapFields(), - logutil.Key("start", j.keyRange.start), - logutil.Key("end", j.keyRange.end)) + logutil.Key("start", j.keyRange.Start), + logutil.Key("end", j.keyRange.End)) } return nil } diff --git a/br/pkg/lightning/backend/local/region_job_test.go b/br/pkg/lightning/backend/local/region_job_test.go index fd508372ef..9117963360 100644 --- a/br/pkg/lightning/backend/local/region_job_test.go +++ b/br/pkg/lightning/backend/local/region_job_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" ) @@ -56,9 +57,9 @@ func TestIsIngestRetryable(t *testing.T) { } job := regionJob{ stage: wrote, - keyRange: Range{ - start: []byte{1}, - end: []byte{3}, + keyRange: common.Range{ + Start: []byte{1}, + End: []byte{3}, }, region: region, writeResult: &tikvWriteResult{ @@ -206,8 +207,8 @@ func TestRegionJobRetryer(t *testing.T) { } job := ®ionJob{ - keyRange: Range{ - start: []byte("123"), + keyRange: common.Range{ + Start: []byte("123"), }, waitUntil: time.Now().Add(-time.Second), } @@ -235,8 +236,8 @@ func TestRegionJobRetryer(t *testing.T) { retryer = startRegionJobRetryer(ctx, putBackCh, &jobWg) job = ®ionJob{ - keyRange: Range{ - start: []byte("123"), + keyRange: common.Range{ + Start: []byte("123"), }, waitUntil: time.Now().Add(-time.Second), } @@ -246,8 +247,8 @@ func TestRegionJobRetryer(t *testing.T) { time.Sleep(3 * time.Second) // now retryer is sending to putBackCh, but putBackCh is blocked job = ®ionJob{ - keyRange: Range{ - start: []byte("456"), + keyRange: common.Range{ + Start: []byte("456"), }, waitUntil: time.Now().Add(-time.Second), } diff --git a/br/pkg/lightning/common/engine.go b/br/pkg/lightning/common/engine.go index d391b9a84c..35c7125193 100644 --- a/br/pkg/lightning/common/engine.go +++ b/br/pkg/lightning/common/engine.go @@ -14,12 +14,30 @@ package common -import "context" +import ( + "context" + + "github.com/pingcap/tidb/br/pkg/lightning/log" +) + +// Range contains a start key and an end key. +type Range struct { + Start []byte + End []byte // end is always exclusive except import_sstpb.SSTMeta +} // Engine describes the common interface of local and external engine that // local backend uses. type Engine interface { + // ID is the identifier of an engine. + ID() string // LoadIngestData returns an IngestData that contains the data in [start, end). LoadIngestData(ctx context.Context, start, end []byte) (IngestData, error) + // KVStatistics returns the total kv size and total kv length. + KVStatistics() (totalKVSize int64, totalKVLength int64) + // ImportedStatistics returns the imported kv size and imported kv length. + ImportedStatistics() (importedKVSize int64, importedKVLength int64) + // SplitRanges splits the range [startKey, endKey) into multiple ranges. + SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]Range, error) // TODO(lance6716): add more methods }