From 4f2a69eca795e29a2d97878b2e304f0204f0cccc Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 17 Nov 2023 16:53:18 +0800 Subject: [PATCH] lightning: release memory pool when external engine close (#48537) close pingcap/tidb#48538 --- br/pkg/lightning/backend/external/engine.go | 17 ++++++++++++++++- br/pkg/lightning/backend/local/local.go | 10 ++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 4bf5a5bd14..398d23e1fc 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -379,7 +379,22 @@ func (e *Engine) SplitRanges( } // Close implements common.Engine. -func (e *Engine) Close() error { return nil } +func (e *Engine) Close() error { + if e.bufPool != nil { + e.bufPool.Destroy() + e.bufPool = nil + } + return nil +} + +// Reset resets the memory buffer pool. +func (e *Engine) Reset() error { + if e.bufPool != nil { + e.bufPool.Destroy() + e.bufPool = membuf.NewPool() + } + return nil +} // MemoryIngestData is the in-memory implementation of IngestData. type MemoryIngestData struct { diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 099404c484..34ee420c46 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1734,6 +1734,11 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 localEngine := local.lockEngine(engineUUID, importMutexStateClose) if localEngine == nil { + if engineI, ok := local.externalEngine[engineUUID]; ok { + extEngine := engineI.(*external.Engine) + return extEngine.Reset() + } + log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) return nil } @@ -1767,6 +1772,11 @@ func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) e localEngine := local.lockEngine(engineUUID, importMutexStateClose) // release this engine after import success if localEngine == nil { + if extEngine, ok := local.externalEngine[engineUUID]; ok { + retErr := extEngine.Close() + delete(local.externalEngine, engineUUID) + return retErr + } log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) return nil }