diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 4d9c30ff07..e022a4f8b8 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -1548,6 +1548,12 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { } ctx = context.WithValue(ctx, &checksumManagerKey, manager) + undo, err := rc.registerTaskToPD(ctx) + if err != nil { + return errors.Trace(err) + } + defer undo() + // Drop all secondary indexes before restore. if rc.cfg.TikvImporter.AddIndexBySQL { if err := rc.dropAllIndexes(ctx); err != nil { @@ -1738,6 +1744,32 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { return nil } +func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) { + etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg) + if err != nil { + return nil, errors.Trace(err) + } + + register := utils.NewTaskRegister(etcdCli, utils.RegisterLightning, fmt.Sprintf("lightning-%s", uuid.New())) + + undo = func() { + closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := register.Close(closeCtx); err != nil { + log.L().Warn("failed to unregister task", zap.Error(err)) + } + if err := etcdCli.Close(); err != nil { + log.L().Warn("failed to close etcd client", zap.Error(err)) + } + } + if err := register.RegisterTask(ctx); err != nil { + undo() + return nil, errors.Trace(err) + } + return undo, nil +} + func (rc *Controller) dropAllIndexes(ctx context.Context) error { for _, dbInfo := range rc.dbInfos { for _, tblInfo := range dbInfo.Tables {