lightning: fix forgetting multiple regionJob use same IngestData (#47143)
ref pingcap/tidb#45719
This commit is contained in:
11
br/pkg/lightning/backend/external/engine.go
vendored
11
br/pkg/lightning/backend/external/engine.go
vendored
@ -153,6 +153,7 @@ func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.
|
||||
values: values,
|
||||
ts: e.ts,
|
||||
memBuf: memBuf,
|
||||
refCnt: atomic.NewInt64(0),
|
||||
importedKVSize: e.importedKVSize,
|
||||
importedKVCount: e.importedKVCount,
|
||||
}, nil
|
||||
@ -246,6 +247,7 @@ type MemoryIngestData struct {
|
||||
ts uint64
|
||||
|
||||
memBuf *membuf.Buffer
|
||||
refCnt *atomic.Int64
|
||||
importedKVSize *atomic.Int64
|
||||
importedKVCount *atomic.Int64
|
||||
}
|
||||
@ -429,9 +431,16 @@ func (m *MemoryIngestData) GetTS() uint64 {
|
||||
return m.ts
|
||||
}
|
||||
|
||||
// IncRef implements IngestData.IncRef.
|
||||
func (m *MemoryIngestData) IncRef() {
|
||||
m.refCnt.Inc()
|
||||
}
|
||||
|
||||
// Finish implements IngestData.Finish.
|
||||
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
|
||||
m.importedKVSize.Add(totalBytes)
|
||||
m.importedKVCount.Add(totalCount)
|
||||
m.memBuf.Destroy()
|
||||
if m.refCnt.Dec() == 0 {
|
||||
m.memBuf.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1032,6 +1032,9 @@ func (e *Engine) GetTS() uint64 {
|
||||
return e.TS
|
||||
}
|
||||
|
||||
// IncRef implements IngestData interface.
|
||||
func (*Engine) IncRef() {}
|
||||
|
||||
// Finish implements IngestData interface.
|
||||
func (e *Engine) Finish(totalBytes, totalCount int64) {
|
||||
e.importedKVSize.Add(totalBytes)
|
||||
|
||||
@ -1208,6 +1208,7 @@ func (local *Backend) generateAndSendJob(
|
||||
return err
|
||||
}
|
||||
for _, job := range jobs {
|
||||
data.IncRef()
|
||||
jobWg.Add(1)
|
||||
select {
|
||||
case <-egCtx.Done():
|
||||
@ -1341,7 +1342,12 @@ func (local *Backend) startWorker(
|
||||
return err2
|
||||
}
|
||||
// 1 "needRescan" job becomes len(jobs) "regionScanned" jobs.
|
||||
jobWg.Add(len(jobs) - 1)
|
||||
newJobCnt := len(jobs) - 1
|
||||
jobWg.Add(newJobCnt)
|
||||
for newJobCnt > 0 {
|
||||
job.ingestData.IncRef()
|
||||
newJobCnt--
|
||||
}
|
||||
for _, j := range jobs {
|
||||
j.lastRetryableErr = job.lastRetryableErr
|
||||
jobOutCh <- j
|
||||
|
||||
@ -1207,6 +1207,8 @@ func (m mockIngestData) NewIter(ctx context.Context, lowerBound, upperBound []by
|
||||
|
||||
func (m mockIngestData) GetTS() uint64 { return 0 }
|
||||
|
||||
func (m mockIngestData) IncRef() {}
|
||||
|
||||
func (m mockIngestData) Finish(_, _ int64) {}
|
||||
|
||||
func TestCheckPeersBusy(t *testing.T) {
|
||||
|
||||
@ -27,6 +27,10 @@ type IngestData interface {
|
||||
NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter
|
||||
// GetTS will be used as the start/commit TS of the data.
|
||||
GetTS() uint64
|
||||
// IncRef should be called every time when IngestData is referred by regionJob.
|
||||
// Multiple regionJob can share one IngestData. Same amount of Finish should be
|
||||
// called to release the IngestData.
|
||||
IncRef()
|
||||
// Finish will be called when the data is ingested successfully.
|
||||
Finish(totalBytes, totalCount int64)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user