backend: ensure engine cleanup to be executed (#64105)
close pingcap/tidb#64101
This commit is contained in:
@ -54,6 +54,7 @@ type engineInfo struct {
|
||||
openedEngine *backend.OpenedEngine
|
||||
|
||||
uuid uuid.UUID
|
||||
backend backend.Backend
|
||||
writerCache generic.SyncMap[int, backend.EngineWriter]
|
||||
memRoot MemRoot
|
||||
flushLock *sync.RWMutex
|
||||
@ -66,6 +67,7 @@ func newEngineInfo(
|
||||
unique bool,
|
||||
en *backend.OpenedEngine,
|
||||
uuid uuid.UUID,
|
||||
bk backend.Backend,
|
||||
memRoot MemRoot,
|
||||
) *engineInfo {
|
||||
return &engineInfo{
|
||||
@ -75,6 +77,7 @@ func newEngineInfo(
|
||||
unique: unique,
|
||||
openedEngine: en,
|
||||
uuid: uuid,
|
||||
backend: bk,
|
||||
writerCache: generic.NewSyncMap[int, backend.EngineWriter](4),
|
||||
memRoot: memRoot,
|
||||
flushLock: &sync.RWMutex{},
|
||||
@ -104,26 +107,25 @@ func (ei *engineInfo) Close(cleanup bool) {
|
||||
}
|
||||
err := ei.closeWriters()
|
||||
if err != nil {
|
||||
logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err),
|
||||
logutil.Logger(ei.ctx).Warn(LitErrCloseWriterErr, zap.Error(err),
|
||||
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
||||
}
|
||||
|
||||
indexEngine := ei.openedEngine
|
||||
closedEngine, err := indexEngine.Close(ei.ctx)
|
||||
if cleanup {
|
||||
defer func() {
|
||||
err = ei.backend.CleanupEngine(ei.ctx, ei.uuid)
|
||||
if err != nil {
|
||||
logutil.Logger(ei.ctx).Warn(LitErrCleanEngineErr, zap.Error(err),
|
||||
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
||||
}
|
||||
}()
|
||||
}
|
||||
_, err = ei.openedEngine.Close(ei.ctx)
|
||||
if err != nil {
|
||||
logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err),
|
||||
logutil.Logger(ei.ctx).Warn(LitErrCloseEngineErr, zap.Error(err),
|
||||
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
||||
return
|
||||
}
|
||||
ei.openedEngine = nil
|
||||
if cleanup {
|
||||
// local intermediate files will be removed.
|
||||
err = closedEngine.Cleanup(ei.ctx)
|
||||
if err != nil {
|
||||
logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err),
|
||||
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writerContext is used to keep a lightning local writer for each backfill worker.
|
||||
|
||||
@ -79,6 +79,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta
|
||||
uniques[i],
|
||||
openedEngine,
|
||||
openedEngine.GetEngineUUID(),
|
||||
bc.backend,
|
||||
bc.memRoot,
|
||||
)
|
||||
}
|
||||
|
||||
@ -1568,6 +1568,13 @@ type dbSSTIngester struct {
|
||||
}
|
||||
|
||||
func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
|
||||
failpoint.InjectCall("beforeMergeSSTs")
|
||||
failpoint.Inject("mockErrInMergeSSTs", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
failpoint.Return(nil, errors.New("mocked error in mergeSSTs"))
|
||||
}
|
||||
})
|
||||
|
||||
if len(metas) == 0 {
|
||||
return nil, errors.New("sst metas is empty")
|
||||
} else if len(metas) == 1 {
|
||||
|
||||
@ -11,7 +11,7 @@ go_test(
|
||||
"temp_index_test.go",
|
||||
],
|
||||
flaky = True,
|
||||
shard_count = 37,
|
||||
shard_count = 38,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
|
||||
@ -17,6 +17,9 @@ package addindextest_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -505,6 +508,47 @@ func TestAddIndexAdvanceWatermarkFailed(t *testing.T) {
|
||||
tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrDupEntry)
|
||||
}
|
||||
|
||||
func TestAddIndexTempDirDataRemoved(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("next-gen doesn't use local backend")
|
||||
}
|
||||
tempDir := t.TempDir()
|
||||
defer config.RestoreFunc()()
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TempDir = tempDir
|
||||
})
|
||||
store := realtikvtest.CreateMockStoreAndSetup(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
|
||||
tk.MustExec("create table t (a int);")
|
||||
tk.MustExec("insert into t values (1), (1), (1);")
|
||||
|
||||
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/lightning/backend/local/mockErrInMergeSSTs", "1*return(true)")
|
||||
removeOnce := sync.Once{}
|
||||
removed := false
|
||||
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/lightning/backend/local/beforeMergeSSTs", func() {
|
||||
removeOnce.Do(func() {
|
||||
var filesToRemove []string
|
||||
filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error {
|
||||
if strings.HasSuffix(path, ".sst") {
|
||||
filesToRemove = append(filesToRemove, path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
for _, f := range filesToRemove {
|
||||
t.Log("removed " + f)
|
||||
err := os.RemoveAll(f)
|
||||
require.NoError(t, err)
|
||||
removed = true
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
tk.MustExec("alter table t add index idx(a);")
|
||||
require.True(t, removed)
|
||||
}
|
||||
|
||||
func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("have overlapped ingest sst, skip")
|
||||
|
||||
Reference in New Issue
Block a user