diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index fc00974aa2..ce16825a03 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -153,6 +153,7 @@ func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common. values: values, ts: e.ts, memBuf: memBuf, + refCnt: atomic.NewInt64(0), importedKVSize: e.importedKVSize, importedKVCount: e.importedKVCount, }, nil @@ -246,6 +247,7 @@ type MemoryIngestData struct { ts uint64 memBuf *membuf.Buffer + refCnt *atomic.Int64 importedKVSize *atomic.Int64 importedKVCount *atomic.Int64 } @@ -429,9 +431,16 @@ func (m *MemoryIngestData) GetTS() uint64 { return m.ts } +// IncRef implements IngestData.IncRef. +func (m *MemoryIngestData) IncRef() { + m.refCnt.Inc() +} + // Finish implements IngestData.Finish. func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) { m.importedKVSize.Add(totalBytes) m.importedKVCount.Add(totalCount) - m.memBuf.Destroy() + if m.refCnt.Dec() == 0 { + m.memBuf.Destroy() + } } diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 7c2fa2082d..468416a39b 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1032,6 +1032,9 @@ func (e *Engine) GetTS() uint64 { return e.TS } +// IncRef implements IngestData interface. +func (*Engine) IncRef() {} + // Finish implements IngestData interface. func (e *Engine) Finish(totalBytes, totalCount int64) { e.importedKVSize.Add(totalBytes) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 63396b1613..418a4b8298 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1208,6 +1208,7 @@ func (local *Backend) generateAndSendJob( return err } for _, job := range jobs { + data.IncRef() jobWg.Add(1) select { case <-egCtx.Done(): @@ -1341,7 +1342,12 @@ func (local *Backend) startWorker( return err2 } // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. - jobWg.Add(len(jobs) - 1) + newJobCnt := len(jobs) - 1 + jobWg.Add(newJobCnt) + for newJobCnt > 0 { + job.ingestData.IncRef() + newJobCnt-- + } for _, j := range jobs { j.lastRetryableErr = job.lastRetryableErr jobOutCh <- j diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index d1ae884608..c097c5e519 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1207,6 +1207,8 @@ func (m mockIngestData) NewIter(ctx context.Context, lowerBound, upperBound []by func (m mockIngestData) GetTS() uint64 { return 0 } +func (m mockIngestData) IncRef() {} + func (m mockIngestData) Finish(_, _ int64) {} func TestCheckPeersBusy(t *testing.T) { diff --git a/br/pkg/lightning/common/ingest_data.go b/br/pkg/lightning/common/ingest_data.go index 7815bee56c..13c22d8abe 100644 --- a/br/pkg/lightning/common/ingest_data.go +++ b/br/pkg/lightning/common/ingest_data.go @@ -27,6 +27,10 @@ type IngestData interface { NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter // GetTS will be used as the start/commit TS of the data. GetTS() uint64 + // IncRef should be called every time when IngestData is referred by regionJob. + // Multiple regionJob can share one IngestData. Same amount of Finish should be + // called to release the IngestData. + IncRef() // Finish will be called when the data is ingested successfully. Finish(totalBytes, totalCount int64) }