From ce45eff1580ea279a67be64a86fbac5cda6a0907 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 5 Aug 2024 12:29:10 +0800 Subject: [PATCH] br: error if the log restore has no full backup schema or id maps (#54421) close pingcap/tidb#54418 --- br/pkg/restore/log_client/client.go | 104 +++++++++++++++++----------- br/pkg/task/stream.go | 23 ++---- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 9e11baf74b..fb34e709ce 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -20,6 +20,7 @@ import ( "crypto/tls" "fmt" "math" + "os" "slices" "strconv" "strings" @@ -637,15 +638,71 @@ type FullBackupStorageConfig struct { type InitSchemaConfig struct { // required - IsNewTask bool - HasFullRestore bool - TableFilter filter.Filter + IsNewTask bool + TableFilter filter.Filter // optional TiFlashRecorder *tiflashrec.TiFlashRecorder FullBackupStorage *FullBackupStorageConfig } +const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL" + +func (rc *LogClient) generateDBReplacesFromFullBackupStorage( + ctx context.Context, + cfg *InitSchemaConfig, +) (map[stream.UpstreamID]*stream.DBReplace, error) { + dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace) + if cfg.FullBackupStorage == nil { + envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL) + if ok && len(envVal) > 0 { + log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)) + return dbReplaces, nil + } + return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS) + } + s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts) + if err != nil { + return nil, errors.Trace(err) + } + fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter) + if err != nil { + return nil, errors.Trace(err) + } + for _, t := range fullBackupTables { + dbName, _ := utils.GetSysDBCIStrName(t.DB.Name) + newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName) + if !exist { + log.Info("db not existed", zap.String("dbname", dbName.String())) + continue + } + + dbReplace, exist := dbReplaces[t.DB.ID] + if !exist { + dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID) + dbReplaces[t.DB.ID] = dbReplace + } + + if t.Info == nil { + // If the db is empty, skip it. + continue + } + newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name) + if err != nil { + log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String())) + continue + } + + dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{ + Name: newTableInfo.Name.O, + TableID: newTableInfo.ID, + PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info), + IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info), + } + } + return dbReplaces, nil +} + // InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. // It is used to rewrite meta kv-event. func (rc *LogClient) InitSchemasReplaceForDDL( @@ -658,7 +715,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL( // the id map doesn't need to construct only when it is not the first execution needConstructIdMap bool - dbReplaces = make(map[stream.UpstreamID]*stream.DBReplace) + dbReplaces map[stream.UpstreamID]*stream.DBReplace ) // not new task, load schemas map from external storage @@ -673,7 +730,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL( // a new task, but without full snapshot restore, tries to load // schemas map whose `restore-ts`` is the task's `start-ts`. - if len(dbMaps) <= 0 && !cfg.HasFullRestore { + if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil { log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS)) needConstructIdMap = true dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS) @@ -695,45 +752,10 @@ func (rc *LogClient) InitSchemasReplaceForDDL( if len(dbMaps) <= 0 { log.Info("no id maps, build the table replaces from cluster and full backup schemas") needConstructIdMap = true - s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts) + dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg) if err != nil { return nil, errors.Trace(err) } - fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter) - if err != nil { - return nil, errors.Trace(err) - } - for _, t := range fullBackupTables { - dbName, _ := utils.GetSysDBCIStrName(t.DB.Name) - newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName) - if !exist { - log.Info("db not existed", zap.String("dbname", dbName.String())) - continue - } - - dbReplace, exist := dbReplaces[t.DB.ID] - if !exist { - dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID) - dbReplaces[t.DB.ID] = dbReplace - } - - if t.Info == nil { - // If the db is empty, skip it. - continue - } - newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name) - if err != nil { - log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String())) - continue - } - - dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{ - Name: newTableInfo.Name.O, - TableID: newTableInfo.ID, - PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info), - IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info), - } - } } else { dbReplaces = stream.FromSchemaMaps(dbMaps) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 5ec178a4ce..29e3177df7 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -435,18 +435,6 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context) error { m.ClusterVersion = clusterVersion }) - schemas := backup.NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error { - return backup.BuildFullSchema(storage, s.cfg.StartTS, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) { - fn(dbInfo, tableInfo) - }) - }, 0) - - err = schemas.BackupSchemas(ctx, metaWriter, nil, s.mgr.GetStorage(), nil, - s.cfg.StartTS, backup.DefaultSchemaConcurrency, 0, true, nil) - if err != nil { - return errors.Trace(err) - } - if err = metaWriter.FlushBackupMeta(ctx); err != nil { return errors.Trace(err) } @@ -1364,7 +1352,6 @@ func restoreStream( // get the schemas ID replace information. schemasReplace, err := client.InitSchemasReplaceForDDL(ctx, &logclient.InitSchemaConfig{ IsNewTask: newTask, - HasFullRestore: len(cfg.FullBackupStorage) > 0, TableFilter: cfg.TableFilter, TiFlashRecorder: cfg.tiflashRecorder, FullBackupStorage: fullBackupStorage, @@ -1686,13 +1673,11 @@ func getFullBackupTS( func parseFullBackupTablesStorage( cfg *RestoreConfig, ) (*logclient.FullBackupStorageConfig, error) { - var storageName string - if len(cfg.FullBackupStorage) > 0 { - storageName = cfg.FullBackupStorage - } else { - storageName = cfg.Storage + if len(cfg.FullBackupStorage) == 0 { + log.Info("the full backup path is not specified, so BR will try to get id maps") + return nil, nil } - u, err := storage.ParseBackend(storageName, &cfg.BackendOptions) + u, err := storage.ParseBackend(cfg.FullBackupStorage, &cfg.BackendOptions) if err != nil { return nil, errors.Trace(err) }