lightning: fix check disk quota routine block when some engine is importing (#44877)
close pingcap/tidb#44867
This commit is contained in:
@ -106,7 +106,7 @@ func (r *syncedRanges) reset() {
|
||||
type Engine struct {
|
||||
engineMeta
|
||||
closed atomic.Bool
|
||||
db *pebble.DB
|
||||
db atomic.Pointer[pebble.DB]
|
||||
UUID uuid.UUID
|
||||
localWriters sync.Map
|
||||
|
||||
@ -157,14 +157,19 @@ func (e *Engine) setError(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) getDB() *pebble.DB {
|
||||
return e.db.Load()
|
||||
}
|
||||
|
||||
// Close closes the engine and release all resources.
|
||||
func (e *Engine) Close() error {
|
||||
e.logger.Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack"))
|
||||
if e.db == nil {
|
||||
db := e.getDB()
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
err := errors.Trace(e.db.Close())
|
||||
e.db = nil
|
||||
err := errors.Trace(db.Close())
|
||||
e.db.Store(nil)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -471,9 +476,7 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter)
|
||||
}
|
||||
|
||||
func (e *Engine) getEngineFileSize() backend.EngineFileSize {
|
||||
e.mutex.RLock()
|
||||
db := e.db
|
||||
e.mutex.RUnlock()
|
||||
db := e.getDB()
|
||||
|
||||
var total pebble.LevelMetrics
|
||||
if db != nil {
|
||||
@ -885,7 +888,7 @@ func (e *Engine) flushEngineWithoutLock(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
flushFinishedCh, err := e.db.AsyncFlush()
|
||||
flushFinishedCh, err := e.getDB().AsyncFlush()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -913,11 +916,11 @@ func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error {
|
||||
func (e *Engine) saveEngineMeta() error {
|
||||
e.logger.Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()),
|
||||
zap.Int64("size", e.TotalSize.Load()))
|
||||
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.db))
|
||||
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB()))
|
||||
}
|
||||
|
||||
func (e *Engine) loadEngineMeta() error {
|
||||
jsonBytes, closer, err := e.db.Get(engineMetaKey)
|
||||
jsonBytes, closer, err := e.getDB().Get(engineMetaKey)
|
||||
if err != nil {
|
||||
if err == pebble.ErrNotFound {
|
||||
e.logger.Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err))
|
||||
@ -944,13 +947,13 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
|
||||
opts = &newOpts
|
||||
}
|
||||
if !e.duplicateDetection {
|
||||
return pebbleIter{Iterator: e.db.NewIter(opts)}
|
||||
return pebbleIter{Iterator: e.getDB().NewIter(opts)}
|
||||
}
|
||||
logger := log.FromContext(ctx).With(
|
||||
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
|
||||
zap.Int64("tableID", e.tableInfo.ID),
|
||||
zap.Stringer("engineUUID", e.UUID))
|
||||
return newDupDetectIter(e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
|
||||
return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
|
||||
}
|
||||
|
||||
// getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
|
||||
@ -1525,8 +1528,9 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error {
|
||||
for _, m := range metas {
|
||||
paths = append(paths, m.path)
|
||||
}
|
||||
if i.e.db == nil {
|
||||
db := i.e.getDB()
|
||||
if db == nil {
|
||||
return errorEngineClosed
|
||||
}
|
||||
return i.e.db.Ingest(paths)
|
||||
return db.Ingest(paths)
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
@ -41,6 +42,44 @@ func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
|
||||
return db, tmpPath
|
||||
}
|
||||
|
||||
func TestGetEngineSizeWhenImport(t *testing.T) {
|
||||
opt := &pebble.Options{
|
||||
MemTableSize: 1024 * 1024,
|
||||
MaxConcurrentCompactions: 16,
|
||||
L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction
|
||||
L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction
|
||||
DisableWAL: true,
|
||||
ReadOnly: false,
|
||||
}
|
||||
db, tmpPath := makePebbleDB(t, opt)
|
||||
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
cancel: cancel,
|
||||
sstMetasChan: make(chan metaOrFlush, 64),
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
// simulate import
|
||||
f.lock(importMutexStateImport)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
engineFileSize := f.getEngineFileSize()
|
||||
require.Equal(t, f.UUID, engineFileSize.UUID)
|
||||
require.True(t, engineFileSize.IsImporting)
|
||||
}()
|
||||
wg.Wait()
|
||||
f.unlock()
|
||||
require.NoError(t, f.Close())
|
||||
}
|
||||
|
||||
func TestIngestSSTWithClosedEngine(t *testing.T) {
|
||||
opt := &pebble.Options{
|
||||
MemTableSize: 1024 * 1024,
|
||||
@ -55,7 +94,6 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -64,6 +102,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
f.sstIngester = dbSSTIngester{e: f}
|
||||
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
|
||||
file, err := os.Create(sstPath)
|
||||
@ -93,9 +132,9 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
|
||||
func TestGetFirstAndLastKey(t *testing.T) {
|
||||
db, tmpPath := makePebbleDB(t, nil)
|
||||
f := &Engine{
|
||||
db: db,
|
||||
sstDir: tmpPath,
|
||||
}
|
||||
f.db.Store(db)
|
||||
err := db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
err = db.Set([]byte("c"), []byte("c"), nil)
|
||||
|
||||
@ -895,7 +895,7 @@ func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig,
|
||||
engine := e.(*Engine)
|
||||
engine.lock(importMutexStateOpen)
|
||||
defer engine.unlock()
|
||||
engine.db = db
|
||||
engine.db.Store(db)
|
||||
engine.sstIngester = dbSSTIngester{e: engine}
|
||||
if err = engine.loadEngineMeta(); err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -934,7 +934,6 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
|
||||
}
|
||||
engine := &Engine{
|
||||
UUID: engineUUID,
|
||||
db: db,
|
||||
sstMetasChan: make(chan metaOrFlush),
|
||||
tableInfo: cfg.TableInfo,
|
||||
keyAdapter: local.keyAdapter,
|
||||
@ -943,6 +942,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
|
||||
duplicateDB: local.duplicateDB,
|
||||
logger: log.FromContext(ctx),
|
||||
}
|
||||
engine.db.Store(db)
|
||||
engine.sstIngester = dbSSTIngester{e: engine}
|
||||
if err = engine.loadEngineMeta(); err != nil {
|
||||
return err
|
||||
@ -1038,7 +1038,7 @@ func (local *Backend) readAndSplitIntoRange(
|
||||
}
|
||||
|
||||
logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))
|
||||
sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter)
|
||||
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -1127,7 +1127,7 @@ func (local *Backend) generateAndSendJob(
|
||||
// when use dynamic region feature, the region may be very big, we need
|
||||
// to split to smaller ranges to increase the concurrency.
|
||||
if regionSplitSize > 2*int64(config.SplitRegionSize) {
|
||||
sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter)
|
||||
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -1603,7 +1603,7 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
|
||||
}
|
||||
db, err := local.openEngineDB(engineUUID, false)
|
||||
if err == nil {
|
||||
localEngine.db = db
|
||||
localEngine.db.Store(db)
|
||||
localEngine.engineMeta = engineMeta{}
|
||||
if !common.IsDirExists(localEngine.sstDir) {
|
||||
if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil {
|
||||
|
||||
@ -328,7 +328,6 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -337,6 +336,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
f.sstIngester = dbSSTIngester{e: f}
|
||||
f.wg.Add(1)
|
||||
go f.ingestSSTLoop()
|
||||
@ -480,7 +480,6 @@ func TestLocalIngestLoop(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
f := Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -493,6 +492,7 @@ func TestLocalIngestLoop(t *testing.T) {
|
||||
},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
f.sstIngester = testIngester{}
|
||||
f.wg.Add(1)
|
||||
go f.ingestSSTLoop()
|
||||
@ -570,7 +570,6 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -583,6 +582,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
|
||||
},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
|
||||
createSSTWriter := func() (*sstWriter, error) {
|
||||
path := filepath.Join(f.sstDir, uuid.New().String()+".sst")
|
||||
@ -1152,7 +1152,6 @@ func TestCheckPeersBusy(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel2 := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -1161,9 +1160,10 @@ func TestCheckPeersBusy(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
err := f.db.Set([]byte("a"), []byte("a"), nil)
|
||||
f.db.Store(db)
|
||||
err := db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
err = f.db.Set([]byte("b"), []byte("b"), nil)
|
||||
err = db.Set([]byte("b"), []byte("b"), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
@ -1288,7 +1288,6 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel2 := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -1297,7 +1296,8 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
err := f.db.Set([]byte("a"), []byte("a"), nil)
|
||||
f.db.Store(db)
|
||||
err := db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
@ -1395,7 +1395,6 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel2 := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -1404,9 +1403,10 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
err := f.db.Set([]byte("a"), []byte("a"), nil)
|
||||
f.db.Store(db)
|
||||
err := db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
|
||||
err = db.Set([]byte("a2"), []byte("a2"), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
@ -1502,7 +1502,6 @@ func TestPartialWriteIngestBusy(t *testing.T) {
|
||||
_, engineUUID := backend.MakeUUID("ww", 0)
|
||||
engineCtx, cancel2 := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -1511,9 +1510,10 @@ func TestPartialWriteIngestBusy(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
err := f.db.Set([]byte("a"), []byte("a"), nil)
|
||||
f.db.Store(db)
|
||||
err := db.Set([]byte("a"), []byte("a"), nil)
|
||||
require.NoError(t, err)
|
||||
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
|
||||
err = db.Set([]byte("a2"), []byte("a2"), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
jobCh := make(chan *regionJob, 10)
|
||||
@ -1641,7 +1641,6 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
engineCtx, cancel := context.WithCancel(context.Background())
|
||||
f := &Engine{
|
||||
db: db,
|
||||
UUID: engineUUID,
|
||||
sstDir: tmpPath,
|
||||
ctx: engineCtx,
|
||||
@ -1650,11 +1649,12 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
|
||||
keyAdapter: noopKeyAdapter{},
|
||||
logger: log.L(),
|
||||
}
|
||||
f.db.Store(db)
|
||||
// keys starts with 0 is meta keys, so we start with 1.
|
||||
for i := byte(1); i <= 10; i++ {
|
||||
err := f.db.Set([]byte{i}, []byte{i}, nil)
|
||||
err := db.Set([]byte{i}, []byte{i}, nil)
|
||||
require.NoError(t, err)
|
||||
err = f.db.Set([]byte{i, 1}, []byte{i, 1}, nil)
|
||||
err = db.Set([]byte{i, 1}, []byte{i, 1}, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user