br: error if the log restore has no full backup schema or id maps (#54421)
close pingcap/tidb#54418
This commit is contained in:
@ -20,6 +20,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -637,15 +638,71 @@ type FullBackupStorageConfig struct {
|
||||
|
||||
type InitSchemaConfig struct {
|
||||
// required
|
||||
IsNewTask bool
|
||||
HasFullRestore bool
|
||||
TableFilter filter.Filter
|
||||
IsNewTask bool
|
||||
TableFilter filter.Filter
|
||||
|
||||
// optional
|
||||
TiFlashRecorder *tiflashrec.TiFlashRecorder
|
||||
FullBackupStorage *FullBackupStorageConfig
|
||||
}
|
||||
|
||||
const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"
|
||||
|
||||
func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
|
||||
ctx context.Context,
|
||||
cfg *InitSchemaConfig,
|
||||
) (map[stream.UpstreamID]*stream.DBReplace, error) {
|
||||
dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace)
|
||||
if cfg.FullBackupStorage == nil {
|
||||
envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)
|
||||
if ok && len(envVal) > 0 {
|
||||
log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL))
|
||||
return dbReplaces, nil
|
||||
}
|
||||
return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS)
|
||||
}
|
||||
s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
for _, t := range fullBackupTables {
|
||||
dbName, _ := utils.GetSysDBCIStrName(t.DB.Name)
|
||||
newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName)
|
||||
if !exist {
|
||||
log.Info("db not existed", zap.String("dbname", dbName.String()))
|
||||
continue
|
||||
}
|
||||
|
||||
dbReplace, exist := dbReplaces[t.DB.ID]
|
||||
if !exist {
|
||||
dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID)
|
||||
dbReplaces[t.DB.ID] = dbReplace
|
||||
}
|
||||
|
||||
if t.Info == nil {
|
||||
// If the db is empty, skip it.
|
||||
continue
|
||||
}
|
||||
newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name)
|
||||
if err != nil {
|
||||
log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String()))
|
||||
continue
|
||||
}
|
||||
|
||||
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
|
||||
Name: newTableInfo.Name.O,
|
||||
TableID: newTableInfo.ID,
|
||||
PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info),
|
||||
IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info),
|
||||
}
|
||||
}
|
||||
return dbReplaces, nil
|
||||
}
|
||||
|
||||
// InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas.
|
||||
// It is used to rewrite meta kv-event.
|
||||
func (rc *LogClient) InitSchemasReplaceForDDL(
|
||||
@ -658,7 +715,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
|
||||
// the id map doesn't need to construct only when it is not the first execution
|
||||
needConstructIdMap bool
|
||||
|
||||
dbReplaces = make(map[stream.UpstreamID]*stream.DBReplace)
|
||||
dbReplaces map[stream.UpstreamID]*stream.DBReplace
|
||||
)
|
||||
|
||||
// not new task, load schemas map from external storage
|
||||
@ -673,7 +730,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
|
||||
|
||||
// a new task, but without full snapshot restore, tries to load
|
||||
// schemas map whose `restore-ts`` is the task's `start-ts`.
|
||||
if len(dbMaps) <= 0 && !cfg.HasFullRestore {
|
||||
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
|
||||
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
|
||||
needConstructIdMap = true
|
||||
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS)
|
||||
@ -695,45 +752,10 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
|
||||
if len(dbMaps) <= 0 {
|
||||
log.Info("no id maps, build the table replaces from cluster and full backup schemas")
|
||||
needConstructIdMap = true
|
||||
s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts)
|
||||
dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
for _, t := range fullBackupTables {
|
||||
dbName, _ := utils.GetSysDBCIStrName(t.DB.Name)
|
||||
newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName)
|
||||
if !exist {
|
||||
log.Info("db not existed", zap.String("dbname", dbName.String()))
|
||||
continue
|
||||
}
|
||||
|
||||
dbReplace, exist := dbReplaces[t.DB.ID]
|
||||
if !exist {
|
||||
dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID)
|
||||
dbReplaces[t.DB.ID] = dbReplace
|
||||
}
|
||||
|
||||
if t.Info == nil {
|
||||
// If the db is empty, skip it.
|
||||
continue
|
||||
}
|
||||
newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name)
|
||||
if err != nil {
|
||||
log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String()))
|
||||
continue
|
||||
}
|
||||
|
||||
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
|
||||
Name: newTableInfo.Name.O,
|
||||
TableID: newTableInfo.ID,
|
||||
PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info),
|
||||
IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dbReplaces = stream.FromSchemaMaps(dbMaps)
|
||||
}
|
||||
|
||||
@ -435,18 +435,6 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context) error {
|
||||
m.ClusterVersion = clusterVersion
|
||||
})
|
||||
|
||||
schemas := backup.NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error {
|
||||
return backup.BuildFullSchema(storage, s.cfg.StartTS, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
|
||||
fn(dbInfo, tableInfo)
|
||||
})
|
||||
}, 0)
|
||||
|
||||
err = schemas.BackupSchemas(ctx, metaWriter, nil, s.mgr.GetStorage(), nil,
|
||||
s.cfg.StartTS, backup.DefaultSchemaConcurrency, 0, true, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = metaWriter.FlushBackupMeta(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -1364,7 +1352,6 @@ func restoreStream(
|
||||
// get the schemas ID replace information.
|
||||
schemasReplace, err := client.InitSchemasReplaceForDDL(ctx, &logclient.InitSchemaConfig{
|
||||
IsNewTask: newTask,
|
||||
HasFullRestore: len(cfg.FullBackupStorage) > 0,
|
||||
TableFilter: cfg.TableFilter,
|
||||
TiFlashRecorder: cfg.tiflashRecorder,
|
||||
FullBackupStorage: fullBackupStorage,
|
||||
@ -1686,13 +1673,11 @@ func getFullBackupTS(
|
||||
func parseFullBackupTablesStorage(
|
||||
cfg *RestoreConfig,
|
||||
) (*logclient.FullBackupStorageConfig, error) {
|
||||
var storageName string
|
||||
if len(cfg.FullBackupStorage) > 0 {
|
||||
storageName = cfg.FullBackupStorage
|
||||
} else {
|
||||
storageName = cfg.Storage
|
||||
if len(cfg.FullBackupStorage) == 0 {
|
||||
log.Info("the full backup path is not specified, so BR will try to get id maps")
|
||||
return nil, nil
|
||||
}
|
||||
u, err := storage.ParseBackend(storageName, &cfg.BackendOptions)
|
||||
u, err := storage.ParseBackend(cfg.FullBackupStorage, &cfg.BackendOptions)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user