diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index ee214aa469..daf19e4e21 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -54,6 +54,7 @@ type engineInfo struct { openedEngine *backend.OpenedEngine uuid uuid.UUID + backend backend.Backend writerCache generic.SyncMap[int, backend.EngineWriter] memRoot MemRoot flushLock *sync.RWMutex @@ -66,6 +67,7 @@ func newEngineInfo( unique bool, en *backend.OpenedEngine, uuid uuid.UUID, + bk backend.Backend, memRoot MemRoot, ) *engineInfo { return &engineInfo{ @@ -75,6 +77,7 @@ func newEngineInfo( unique: unique, openedEngine: en, uuid: uuid, + backend: bk, writerCache: generic.NewSyncMap[int, backend.EngineWriter](4), memRoot: memRoot, flushLock: &sync.RWMutex{}, @@ -104,26 +107,25 @@ func (ei *engineInfo) Close(cleanup bool) { } err := ei.closeWriters() if err != nil { - logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err), + logutil.Logger(ei.ctx).Warn(LitErrCloseWriterErr, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) } - - indexEngine := ei.openedEngine - closedEngine, err := indexEngine.Close(ei.ctx) + if cleanup { + defer func() { + err = ei.backend.CleanupEngine(ei.ctx, ei.uuid) + if err != nil { + logutil.Logger(ei.ctx).Warn(LitErrCleanEngineErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } + }() + } + _, err = ei.openedEngine.Close(ei.ctx) if err != nil { - logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err), + logutil.Logger(ei.ctx).Warn(LitErrCloseEngineErr, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) return } ei.openedEngine = nil - if cleanup { - // local intermediate files will be removed. - err = closedEngine.Cleanup(ei.ctx) - if err != nil { - logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err), - zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - } - } } // writerContext is used to keep a lightning local writer for each backfill worker. diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index f53c29d89e..88cf5a49da 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -79,6 +79,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta uniques[i], openedEngine, openedEngine.GetEngineUUID(), + bc.backend, bc.memRoot, ) } diff --git a/pkg/lightning/backend/local/engine.go b/pkg/lightning/backend/local/engine.go index c8c64ea4d5..3a051b608d 100644 --- a/pkg/lightning/backend/local/engine.go +++ b/pkg/lightning/backend/local/engine.go @@ -1568,6 +1568,13 @@ type dbSSTIngester struct { } func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) { + failpoint.InjectCall("beforeMergeSSTs") + failpoint.Inject("mockErrInMergeSSTs", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("mocked error in mergeSSTs")) + } + }) + if len(metas) == 0 { return nil, errors.New("sst metas is empty") } else if len(metas) == 1 { diff --git a/tests/realtikvtest/addindextest3/BUILD.bazel b/tests/realtikvtest/addindextest3/BUILD.bazel index 15e6e98671..d630992643 100644 --- a/tests/realtikvtest/addindextest3/BUILD.bazel +++ b/tests/realtikvtest/addindextest3/BUILD.bazel @@ -11,7 +11,7 @@ go_test( "temp_index_test.go", ], flaky = True, - shard_count = 37, + shard_count = 38, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 0f50b7d083..0c16d3458d 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -17,6 +17,9 @@ package addindextest_test import ( "context" "fmt" + "io/fs" + "os" + "path/filepath" "strings" "sync" "sync/atomic" @@ -505,6 +508,47 @@ func TestAddIndexAdvanceWatermarkFailed(t *testing.T) { tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrDupEntry) } +func TestAddIndexTempDirDataRemoved(t *testing.T) { + if kerneltype.IsNextGen() { + t.Skip("next-gen doesn't use local backend") + } + tempDir := t.TempDir() + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempDir = tempDir + }) + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1), (1), (1);") + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/lightning/backend/local/mockErrInMergeSSTs", "1*return(true)") + removeOnce := sync.Once{} + removed := false + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/lightning/backend/local/beforeMergeSSTs", func() { + removeOnce.Do(func() { + var filesToRemove []string + filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error { + if strings.HasSuffix(path, ".sst") { + filesToRemove = append(filesToRemove, path) + } + return nil + }) + for _, f := range filesToRemove { + t.Log("removed " + f) + err := os.RemoveAll(f) + require.NoError(t, err) + removed = true + } + }) + }) + + tk.MustExec("alter table t add index idx(a);") + require.True(t, removed) +} + func TestAddIndexRemoteDuplicateCheck(t *testing.T) { if kerneltype.IsNextGen() { t.Skip("have overlapped ingest sst, skip")