lightning: register import task to pd (#42504)

ref pingcap/tidb#41536
This commit is contained in:
Yujie Xia
2023-03-24 09:24:42 +08:00
committed by GitHub
parent 1d36047a47
commit da89b1b42b

View File

@ -1548,6 +1548,12 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}
ctx = context.WithValue(ctx, &checksumManagerKey, manager)
undo, err := rc.registerTaskToPD(ctx)
if err != nil {
return errors.Trace(err)
}
defer undo()
// Drop all secondary indexes before restore.
if rc.cfg.TikvImporter.AddIndexBySQL {
if err := rc.dropAllIndexes(ctx); err != nil {
@ -1738,6 +1744,32 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
return nil
}
func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg)
if err != nil {
return nil, errors.Trace(err)
}
register := utils.NewTaskRegister(etcdCli, utils.RegisterLightning, fmt.Sprintf("lightning-%s", uuid.New()))
undo = func() {
closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := register.Close(closeCtx); err != nil {
log.L().Warn("failed to unregister task", zap.Error(err))
}
if err := etcdCli.Close(); err != nil {
log.L().Warn("failed to close etcd client", zap.Error(err))
}
}
if err := register.RegisterTask(ctx); err != nil {
undo()
return nil, errors.Trace(err)
}
return undo, nil
}
func (rc *Controller) dropAllIndexes(ctx context.Context) error {
for _, dbInfo := range rc.dbInfos {
for _, tblInfo := range dbInfo.Tables {