diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index e96855edc6..379034ddd5 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -75,7 +75,8 @@ var ( ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted")) // ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently. - ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist")) + ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist")) + ErrStreamLogTaskHasNoStorage = errors.Normalize("stream task has no storage", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskHasNoStorage")) // TODO maybe it belongs to PiTR. ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain")) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index e15ba71b6d..b85909f264 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//br/pkg/pdutil", "//br/pkg/restore/split", "//br/pkg/restore/utils", + "//br/pkg/storage", "//br/pkg/summary", "//br/pkg/utils", "//br/pkg/utils/iter", @@ -28,6 +29,7 @@ go_library( "//pkg/parser/ast", "//pkg/util", "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_gogo_protobuf//proto", "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", @@ -50,19 +52,22 @@ go_test( name = "restore_test", timeout = "short", srcs = [ + "export_test.go", "import_mode_switcher_test.go", "misc_test.go", "restorer_test.go", ], + embed = [":restore"], flaky = True, - shard_count = 13, + shard_count = 17, deps = [ - ":restore", "//br/pkg/conn", "//br/pkg/mock", "//br/pkg/pdutil", "//br/pkg/restore/split", "//br/pkg/restore/utils", + "//br/pkg/storage", + "//br/pkg/utils", "//br/pkg/utils/iter", "//pkg/kv", "//pkg/parser/ast", diff --git a/br/pkg/restore/export_test.go b/br/pkg/restore/export_test.go new file mode 100644 index 0000000000..fb10d6ece9 --- /dev/null +++ b/br/pkg/restore/export_test.go @@ -0,0 +1,21 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restore + +var ( + LogRestoreTableIDBlocklistFilePrefix = logRestoreTableIDBlocklistFilePrefix + ParseLogRestoreTableIDsBlocklistFileName = parseLogRestoreTableIDsBlocklistFileName + UnmarshalLogRestoreTableIDsBlocklistFile = unmarshalLogRestoreTableIDsBlocklistFile +) diff --git a/br/pkg/restore/misc.go b/br/pkg/restore/misc.go index 80802f359c..c23a8da25f 100644 --- a/br/pkg/restore/misc.go +++ b/br/pkg/restore/misc.go @@ -15,15 +15,23 @@ package restore import ( + "bytes" "context" + "crypto/sha256" + "encoding/base64" + "encoding/binary" "fmt" + "path" + "strconv" "strings" + "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" @@ -34,6 +42,7 @@ import ( "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // deprecated parameter @@ -44,6 +53,192 @@ const ( CoarseGrained Granularity = "coarse-grained" ) +const logRestoreTableIDBlocklistFilePrefix = "v1/log_restore_tables_blocklists" + +type LogRestoreTableIDsBlocklistFile struct { + // RestoreCommitTs records the timestamp after PITR restore done. Only the later PITR restore from the log backup of the cluster, + // whose BackupTS is not less than it, can ignore the restore table IDs blocklist recorded in the file. + RestoreCommitTs uint64 `protobuf:"varint,1,opt,name=restore_commit_ts,proto3"` + // SnapshotBackupTs records the BackupTS of the PITR restore. Any PITR restore from the log backup of the cluster, whose restoredTS + // is less than it, can ignore the restore table IDs blocklist recorded in the file. + SnapshotBackupTs uint64 `protobuf:"varint,2,opt,name=snapshot_backup_ts,proto3"` + // TableIDs records the table IDs blocklist of the cluster running the log backup task. + TableIds []int64 `protobuf:"varint,3,rep,packed,name=table_ids,proto3"` + // Checksum records the checksum of other fields. + Checksum []byte `protobuf:"bytes,4,opt,name=checksum,proto3"` +} + +func (m *LogRestoreTableIDsBlocklistFile) Reset() { *m = LogRestoreTableIDsBlocklistFile{} } +func (m *LogRestoreTableIDsBlocklistFile) String() string { return proto.CompactTextString(m) } +func (m *LogRestoreTableIDsBlocklistFile) ProtoMessage() {} + +func (m *LogRestoreTableIDsBlocklistFile) filename() string { + return fmt.Sprintf("%s/R%016X_S%016X.meta", logRestoreTableIDBlocklistFilePrefix, m.RestoreCommitTs, m.SnapshotBackupTs) +} + +func parseLogRestoreTableIDsBlocklistFileName(filename string) (restoreCommitTs, snapshotBackupTs uint64, parsed bool) { + filename = path.Base(filename) + if !strings.HasSuffix(filename, ".meta") { + return 0, 0, false + } + if filename[0] != 'R' { + return 0, 0, false + } + ts, err := strconv.ParseUint(filename[1:17], 16, 64) + if err != nil { + log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err)) + return 0, 0, false + } + restoreCommitTs = ts + if filename[17] != '_' || filename[18] != 'S' { + return 0, 0, false + } + ts, err = strconv.ParseUint(filename[19:35], 16, 64) + if err != nil { + log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err)) + return 0, 0, false + } + snapshotBackupTs = ts + return restoreCommitTs, snapshotBackupTs, true +} + +func (m *LogRestoreTableIDsBlocklistFile) checksumLogRestoreTableIDsBlocklistFile() []byte { + hasher := sha256.New() + hasher.Write(binary.LittleEndian.AppendUint64(nil, m.RestoreCommitTs)) + hasher.Write(binary.LittleEndian.AppendUint64(nil, m.SnapshotBackupTs)) + for _, tableId := range m.TableIds { + hasher.Write(binary.LittleEndian.AppendUint64(nil, uint64(tableId))) + } + return hasher.Sum(nil) +} + +func (m *LogRestoreTableIDsBlocklistFile) setChecksumLogRestoreTableIDsBlocklistFile() { + m.Checksum = m.checksumLogRestoreTableIDsBlocklistFile() +} + +// MarshalLogRestoreTableIDsBlocklistFile generates an Blocklist file and marshals it. It returns its filename and the marshaled data. +func MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, snapshotBackupTs uint64, tableIds []int64) (string, []byte, error) { + blocklistFile := &LogRestoreTableIDsBlocklistFile{ + RestoreCommitTs: restoreCommitTs, + SnapshotBackupTs: snapshotBackupTs, + TableIds: tableIds, + } + blocklistFile.setChecksumLogRestoreTableIDsBlocklistFile() + filename := blocklistFile.filename() + data, err := proto.Marshal(blocklistFile) + if err != nil { + return "", nil, errors.Trace(err) + } + return filename, data, nil +} + +// unmarshalLogRestoreTableIDsBlocklistFile unmarshals the given blocklist file. +func unmarshalLogRestoreTableIDsBlocklistFile(data []byte) (restoreCommitTs, snapshotBackupTs uint64, tableIds []int64, err error) { + blocklistFile := &LogRestoreTableIDsBlocklistFile{} + if err = proto.Unmarshal(data, blocklistFile); err != nil { + return 0, 0, nil, errors.Trace(err) + } + if !bytes.Equal(blocklistFile.checksumLogRestoreTableIDsBlocklistFile(), blocklistFile.Checksum) { + return 0, 0, nil, errors.Errorf( + "checksum mismatch (calculated checksum is %s but the recorded checksum is %s), the log restore table IDs blocklist file may be corrupted", + base64.StdEncoding.EncodeToString(blocklistFile.checksumLogRestoreTableIDsBlocklistFile()), + base64.StdEncoding.EncodeToString(blocklistFile.Checksum), + ) + } + return blocklistFile.RestoreCommitTs, blocklistFile.SnapshotBackupTs, blocklistFile.TableIds, nil +} + +func fastWalkLogRestoreTableIDsBlocklistFile( + ctx context.Context, + s storage.ExternalStorage, + filterOutFn func(restoreCommitTs, snapshotBackupTs uint64) bool, + executionFn func(ctx context.Context, filename string, restoreCommitTs uint64, tableIds []int64) error, +) error { + filenames := make([]string, 0) + if err := s.WalkDir(ctx, &storage.WalkOption{SubDir: logRestoreTableIDBlocklistFilePrefix}, func(path string, _ int64) error { + restoreCommitTs, snapshotBackupTs, parsed := parseLogRestoreTableIDsBlocklistFileName(path) + if parsed { + if filterOutFn(restoreCommitTs, snapshotBackupTs) { + return nil + } + } + filenames = append(filenames, path) + return nil + }); err != nil { + return errors.Trace(err) + } + workerpool := tidbutil.NewWorkerPool(8, "walk dir log restore table IDs blocklist files") + eg, ectx := errgroup.WithContext(ctx) + for _, filename := range filenames { + if ectx.Err() != nil { + break + } + workerpool.ApplyOnErrorGroup(eg, func() error { + data, err := s.ReadFile(ectx, filename) + if err != nil { + return errors.Trace(err) + } + restoreCommitTs, snapshotBackupTs, tableIds, err := unmarshalLogRestoreTableIDsBlocklistFile(data) + if err != nil { + return errors.Trace(err) + } + if filterOutFn(restoreCommitTs, snapshotBackupTs) { + return nil + } + err = executionFn(ectx, filename, restoreCommitTs, tableIds) + return errors.Trace(err) + }) + } + return errors.Trace(eg.Wait()) +} + +// CheckTableTrackerContainsTableIDsFromBlocklistFiles checks whether pitr id tracker contains the filtered table IDs from blocklist file. +func CheckTableTrackerContainsTableIDsFromBlocklistFiles( + ctx context.Context, + s storage.ExternalStorage, + tracker *utils.PiTRIdTracker, + startTs, restoredTs uint64, + tableNameByTableID func(tableID int64) string, + checkTableIDLost func(tableId int64) bool, +) error { + err := fastWalkLogRestoreTableIDsBlocklistFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool { + return startTs >= restoreCommitTs || restoredTs <= snapshotBackupTs + }, func(_ context.Context, _ string, restoreCommitTs uint64, tableIds []int64) error { + for _, tableId := range tableIds { + if tracker.ContainsTableId(tableId) || tracker.ContainsPartitionId(tableId) { + return errors.Errorf( + "cannot restore the table(Id=%d, name=%s at %d) because it is log restored(at %d) before snapshot backup(at %d). "+ + "Please respecify the filter that does not contain the table or replace with a newer snapshot backup.", + tableId, tableNameByTableID(tableId), restoredTs, restoreCommitTs, startTs) + } + // the meta kv may not be backed by log restore + if checkTableIDLost(tableId) { + return errors.Errorf( + "cannot restore the table(Id=%d) because it is log restored(at %d) before snapshot backup(at %d). "+ + "Please respecify the filter that does not contain the table or replace with a newer snapshot backup.", + tableId, restoreCommitTs, startTs, + ) + } + } + return nil + }) + return errors.Trace(err) +} + +// TruncateLogRestoreTableIDsBlocklistFiles truncates the blocklist files whose restore commit ts is not larger than truncate until ts. +func TruncateLogRestoreTableIDsBlocklistFiles( + ctx context.Context, + s storage.ExternalStorage, + untilTs uint64, +) error { + err := fastWalkLogRestoreTableIDsBlocklistFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool { + return untilTs < restoreCommitTs + }, func(ctx context.Context, filename string, _ uint64, _ []int64) error { + return s.DeleteFile(ctx, filename) + }) + return errors.Trace(err) +} + type UniqueTableName struct { DB string Table string diff --git a/br/pkg/restore/misc_test.go b/br/pkg/restore/misc_test.go index 1b0cb8f84d..5cc314932e 100644 --- a/br/pkg/restore/misc_test.go +++ b/br/pkg/restore/misc_test.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/split" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/session" @@ -130,3 +132,134 @@ func TestGetTSWithRetry(t *testing.T) { require.NoError(t, err) }) } + +func TestParseLogRestoreTableIDsBlocklistFileName(t *testing.T) { + restoreCommitTs, snapshotBackupTs, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName("RFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.meta") + require.True(t, parsed) + require.Equal(t, uint64(0xFFFFFFFFFFFFFFFF), restoreCommitTs) + require.Equal(t, uint64(0xFFFFFFFFFFFFFFFF), snapshotBackupTs) + unparsedFilenames := []string{ + "KFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.meta", + "RFFFFFFFFFFFFFFFF.SFFFFFFFFFFFFFFFF.meta", + "RFFFFFFFFFFFFFFFF_KFFFFFFFFFFFFFFFF.meta", + "RFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.mata", + "RFFFFFFFKFFFFFFFF_SFFFFFFFFFFFFFFFF.meta", + "RFFFFFFFFFFFFFFFF_SFFFFFFFFKFFFFFFF.meta", + } + for _, filename := range unparsedFilenames { + _, _, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName(filename) + require.False(t, parsed) + } +} + +func TestLogRestoreTableIDsBlocklistFile(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + stg, err := storage.NewLocalStorage(base) + require.NoError(t, err) + name, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(0xFFFFFCDEFFFFF, 0xFFFFFFABCFFFF, []int64{1, 2, 3}) + require.NoError(t, err) + restoreCommitTs, snapshotBackupTs, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName(name) + require.True(t, parsed) + require.Equal(t, uint64(0xFFFFFCDEFFFFF), restoreCommitTs) + require.Equal(t, uint64(0xFFFFFFABCFFFF), snapshotBackupTs) + err = stg.WriteFile(ctx, name, data) + require.NoError(t, err) + data, err = stg.ReadFile(ctx, name) + require.NoError(t, err) + restoreCommitTs, snapshotBackupTs, tableIds, err := restore.UnmarshalLogRestoreTableIDsBlocklistFile(data) + require.NoError(t, err) + require.Equal(t, uint64(0xFFFFFCDEFFFFF), restoreCommitTs) + require.Equal(t, uint64(0xFFFFFFABCFFFF), snapshotBackupTs) + require.Equal(t, []int64{1, 2, 3}, tableIds) +} + +func writeBlocklistFile( + ctx context.Context, t *testing.T, s storage.ExternalStorage, + restoreCommitTs, snapshotBackupTs uint64, tableIds []int64, +) { + name, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, snapshotBackupTs, tableIds) + require.NoError(t, err) + err = s.WriteFile(ctx, name, data) + require.NoError(t, err) +} + +func fakeTrackerID(tableIds []int64) *utils.PiTRIdTracker { + tracker := utils.NewPiTRIdTracker() + for _, tableId := range tableIds { + tracker.TableIdToDBIds[tableId] = make(map[int64]struct{}) + } + return tracker +} + +func TestCheckTableTrackerContainsTableIDsFromBlocklistFiles(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + stg, err := storage.NewLocalStorage(base) + require.NoError(t, err) + writeBlocklistFile(ctx, t, stg, 100, 10, []int64{100, 101, 102}) + writeBlocklistFile(ctx, t, stg, 200, 20, []int64{200, 201, 202}) + writeBlocklistFile(ctx, t, stg, 300, 30, []int64{300, 301, 302}) + tableNameByTableID := func(tableID int64) string { + return fmt.Sprintf("table_%d", tableID) + } + checkTableIDLost := func(tableId int64) bool { + return false + } + checkTableIDLost2 := func(tableId int64) bool { + return true + } + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 250, 300, tableNameByTableID, checkTableIDLost) + require.Error(t, err) + require.Contains(t, err.Error(), "table_300") + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 250, 300, tableNameByTableID, checkTableIDLost) + require.NoError(t, err) + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 250, 300, tableNameByTableID, checkTableIDLost2) + require.Error(t, err) + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 250, 300, tableNameByTableID, checkTableIDLost) + require.NoError(t, err) + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 250, 300, tableNameByTableID, checkTableIDLost2) + require.Error(t, err) + + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 1, 25, tableNameByTableID, checkTableIDLost) + require.NoError(t, err) + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 1, 25, tableNameByTableID, checkTableIDLost2) + require.Error(t, err) + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 1, 25, tableNameByTableID, checkTableIDLost) + require.Error(t, err) + require.Contains(t, err.Error(), "table_200") + err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 1, 25, tableNameByTableID, checkTableIDLost) + require.Error(t, err) + require.Contains(t, err.Error(), "table_100") +} + +func filesCount(ctx context.Context, s storage.ExternalStorage) int { + count := 0 + s.WalkDir(ctx, &storage.WalkOption{SubDir: restore.LogRestoreTableIDBlocklistFilePrefix}, func(path string, size int64) error { + count += 1 + return nil + }) + return count +} + +func TestTruncateLogRestoreTableIDsBlocklistFiles(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + stg, err := storage.NewLocalStorage(base) + require.NoError(t, err) + writeBlocklistFile(ctx, t, stg, 100, 10, []int64{100, 101, 102}) + writeBlocklistFile(ctx, t, stg, 200, 20, []int64{200, 201, 202}) + writeBlocklistFile(ctx, t, stg, 300, 30, []int64{300, 301, 302}) + + err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 50) + require.NoError(t, err) + require.Equal(t, 3, filesCount(ctx, stg)) + + err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 250) + require.NoError(t, err) + require.Equal(t, 1, filesCount(ctx, stg)) + + err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 350) + require.NoError(t, err) + require.Equal(t, 0, filesCount(ctx, stg)) +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index ff3fb21b2f..1956012464 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -34,6 +34,7 @@ import ( snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" @@ -785,7 +786,7 @@ func printRestoreMetrics() { } // RunRestore starts a restore task inside the current goroutine. -func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { +func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) (restoreErr error) { etcdCLI, err := dialEtcdWithCfg(c, cfg.Config) if err != nil { return err @@ -795,7 +796,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Error("failed to close the etcd client", zap.Error(err)) } }() - if err := checkConflictingLogBackup(c, cfg, etcdCLI); err != nil { + logTaskBackend, err := checkConflictingLogBackup(c, cfg, IsStreamRestore(cmdName), etcdCLI) + if err != nil { return errors.Annotate(err, "failed to check task exists") } closeF, err := registerTaskToPD(c, etcdCLI) @@ -817,6 +819,38 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() defer cfg.CloseCheckpointMetaManager() + defer func() { + if logTaskBackend == nil || restoreErr != nil || cfg.PiTRTableTracker == nil { + return + } + restoreCommitTs, err := restore.GetTSWithRetry(c, mgr.GetPDClient()) + if err != nil { + restoreErr = err + return + } + tableIds := make([]int64, 0, len(cfg.PiTRTableTracker.TableIdToDBIds)) + for tableId := range cfg.PiTRTableTracker.TableIdToDBIds { + tableIds = append(tableIds, tableId) + } + for tableId := range cfg.PiTRTableTracker.PartitionIds { + tableIds = append(tableIds, tableId) + } + filename, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, cfg.StartTS, tableIds) + if err != nil { + restoreErr = err + return + } + logTaskStorage, err := storage.Create(c, logTaskBackend, false) + if err != nil { + restoreErr = err + return + } + log.Info("save the log restore table IDs blocklist into log backup storage") + if err = logTaskStorage.WriteFile(c, filename, data); err != nil { + restoreErr = err + return + } + }() if err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error { enableFollowerHandleRegion, err := se.GetGlobalSysVar(vardef.PDEnableFollowerHandleRegion) @@ -830,12 +864,11 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer printRestoreMetrics() - var restoreError error if IsStreamRestore(cmdName) { if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil { return errors.Trace(err) } - restoreError = RunStreamRestore(c, mgr, g, cfg) + restoreErr = RunStreamRestore(c, mgr, g, cfg) } else { if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBR); err != nil { return errors.Trace(err) @@ -843,10 +876,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf snapshotRestoreConfig := SnapshotRestoreConfig{ RestoreConfig: cfg, } - restoreError = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig) + restoreErr = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig) } - if restoreError != nil { - return errors.Trace(restoreError) + if restoreErr != nil { + return errors.Trace(restoreErr) } // Clear the checkpoint data if cfg.UseCheckpoint { @@ -883,6 +916,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf type SnapshotRestoreConfig struct { *RestoreConfig piTRTaskInfo *PiTRTaskInfo + logRestoreStorage storage.ExternalStorage logTableHistoryManager *stream.LogBackupTableHistoryManager tableMappingManager *stream.TableMappingManager } @@ -1066,13 +1100,14 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s // only run when this full restore is part of the PiTR if isPiTR { + snapshotTableMap := client.GetTableMap() // adjust tables to restore in the snapshot restore phase since it will later be renamed during // log restore and will fall into or out of the filter range. err = AdjustTablesToRestoreAndCreateTableTracker( cfg.logTableHistoryManager, cfg.RestoreConfig, client.GetDatabaseMap(), - client.GetTableMap(), + snapshotTableMap, client.GetPartitionMap(), tableMap, dbMap, @@ -1084,6 +1119,46 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s log.Info("adjusted items to restore", zap.Int("tables", len(tableMap)), zap.Int("db", len(dbMap))) + + tableNameByTableID := func(tableID int64) string { + dbName, tableName, dbID := "", "", int64(0) + history := cfg.logTableHistoryManager.GetTableHistory() + if locations, exists := history[tableID]; exists { + if name, exists := cfg.logTableHistoryManager.GetDBNameByID(locations[1].DbID); exists { + dbName = name + } + dbID = locations[1].DbID + tableName = locations[1].TableName + } else if tableMeta, exists := tableMap[tableID]; exists && tableMeta != nil && tableMeta.Info != nil { + if tableMeta.DB != nil && len(dbName) == 0 { + dbName = tableMeta.DB.Name.O + } + tableName = tableMeta.Info.Name.O + } + if len(dbName) == 0 && dbID > 0 { + if dbInfo, exists := dbMap[dbID]; exists { + dbName = dbInfo.Info.Name.O + } + } + return fmt.Sprintf("%s.%s", dbName, tableName) + } + checkTableIDLost := func(tableId int64) bool { + // check whether exists in log backup + if _, exists := cfg.logTableHistoryManager.GetTableHistory()[tableId]; exists { + return false + } + // check whether exists in snapshot backup + if _, exists := snapshotTableMap[tableId]; exists { + return false + } + return true + } + if err := restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles( + ctx, cfg.logRestoreStorage, cfg.PiTRTableTracker, backupMeta.GetEndVersion(), cfg.piTRTaskInfo.RestoreTS, + tableNameByTableID, checkTableIDLost, + ); err != nil { + return errors.Trace(err) + } } tables := utils.Values(tableMap) dbs := utils.Values(dbMap) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 1839891dce..d8bc5fda9c 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1195,6 +1195,19 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre } } + // begin to remove log restore table IDs blocklist files + removeLogRestoreTableIDsMarkerFilesDone := console.ShowTask("Removing log restore table IDs blocklist files...", glue.WithTimeCost()) + defer func() { + if removeLogRestoreTableIDsMarkerFilesDone != nil { + removeLogRestoreTableIDsMarkerFilesDone() + } + }() + if err := restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, extStorage, cfg.Until); err != nil { + return errors.Trace(err) + } + removeLogRestoreTableIDsMarkerFilesDone() + removeLogRestoreTableIDsMarkerFilesDone = nil + // begin to remove p := console.StartProgressBar( "Truncating Data Files and Metadata", fileCount, @@ -1232,24 +1245,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre // checkConflictingLogBackup checks whether there is a log backup task running. // If so, return an error. -func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error { +// If the execution is PITR restore, returns the external storage backend of taskInfos to record log restore table ids marker. +func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, streamRestore bool, etcdCLI *clientv3.Client) (*backuppb.StorageBackend, error) { if err := checkConfigForStatus(cfg.PD); err != nil { - return err + return nil, err } cli := streamhelper.NewMetaDataClient(etcdCLI) // check log backup task tasks, err := cli.GetAllTasks(ctx) if err != nil { - return err + return nil, err + } + if streamRestore && len(tasks) > 0 { + if tasks[0].Info.Storage == nil { + return nil, errors.Annotatef(berrors.ErrStreamLogTaskHasNoStorage, + "cannot save log restore table IDs blocklist file because the external storage backend of the task[%s] is empty", tasks[0].Info.Name) + } + return tasks[0].Info.Storage, nil } for _, task := range tasks { if err := checkTaskCompat(cfg, task); err != nil { - return err + return nil, err } } - return nil + return nil, nil } func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error { @@ -1257,9 +1278,6 @@ func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error { "You may check the extra information to get rid of this. If that doesn't work, you may "+ "stop the task before restore, and after the restore operation finished, "+ "create log-backup task again and create a full backup on this cluster.", task.Info.Name) - if len(cfg.FullBackupStorage) > 0 { - return errors.Annotate(baseErr, "you want to do point in time restore, which isn't compatible with an enabled log backup task yet") - } if !cfg.UserFiltered() { return errors.Annotate(baseErr, "you want to restore a whole cluster, you may use `-f` or `restore table|database` to "+ @@ -1395,6 +1413,7 @@ func RunStreamRestore( snapshotRestoreConfig := SnapshotRestoreConfig{ RestoreConfig: cfg, piTRTaskInfo: taskInfo, + logRestoreStorage: s, logTableHistoryManager: metaInfoProcessor.GetTableHistoryManager(), tableMappingManager: metaInfoProcessor.GetTableMappingManager(), } diff --git a/br/tests/br_pitr_log_restore_backup_compatibility/run.sh b/br/tests/br_pitr_log_restore_backup_compatibility/run.sh new file mode 100644 index 0000000000..71e953b40b --- /dev/null +++ b/br/tests/br_pitr_log_restore_backup_compatibility/run.sh @@ -0,0 +1,143 @@ +#!/bin/bash +# +# Copyright 2025 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +# const value +PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +TASK_NAME="br_pitr_log_restore_backup_compatibility" + +restart_services + +# prepare the data +run_sql "create database if not exists test" +run_sql "create table test.t1 (id int)" +run_sql "insert into test.t1 values (1), (10), (100)" + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$PREFIX/log" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full" + +# prepare the incremental data +run_sql "create table test.t2(id int)" +run_sql "insert into test.t1 values (11), (111)" +run_sql "insert into test.t2 values (2), (20), (200)" + +# get the checkpoint ts +sleep 5 +ok_restored_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") +sleep 5 + +## prepare another log restore + +# prepare the data +run_sql "create table test.t3(id int)" +run_sql "insert into test.t3 values (3), (30), (300)" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full2" + +# prepare the incremental data +run_sql "insert into test.t3 values (33), (333)" + +# wait checkpoint advance +sleep 5 +restored_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") +. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME + +# clean the table test.t3 +run_sql "drop table test.t3" + +# run PITR restore +echo "run PITR restore" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full2" --filter test.t3 --restored-ts $restored_ts + +# check the blocklist file +if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then + echo "Error: no blocklist is saved" + exit 1 +fi + +sleep 5 +truncate_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") + +# prepare the data +run_sql "create table test.t4(id int)" +run_sql "insert into test.t4 values (4), (40), (400)" + +# snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full3" + +# prepare the incremental data +run_sql "insert into test.t4 values (44), (444)" + +# wait checkpoint advance +sleep 5 +. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME + +## test log restore with block list + +# pass because restored ts is less than BackupTS of snapshot backup 2 +restart_services +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" --restored-ts $ok_restored_ts +run_sql "select sum(id) as SUM from test.t1" +check_contains "SUM: 233" + +# pass because backup ts is larger than restore commit ts +restart_services +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full3" +run_sql "select sum(id) as SUM from test.t1" +check_contains "SUM: 233" +run_sql "select sum(id) as SUM from test.t2" +check_contains "SUM: 222" +run_sql "select sum(id) as SUM from test.t3" +check_contains "SUM: 699" +run_sql "select sum(id) as SUM from test.t4" +check_contains "SUM: 932" + +# otherwise, failed +restart_services +success=true +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 || success=false +if $success; then + echo "Error: PITR restore must be failed" + exit 1 +fi +check_contains "cannot restore the table" + +# truncate the blocklist +run_br log truncate -s "local://$TEST_DIR/$PREFIX/log" --until $ok_restored_ts -y +if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then + echo "Error: blocklist is truncated" + exit 1 +fi + +run_br log truncate -s "local://$TEST_DIR/$PREFIX/log" --until $truncate_ts -y +if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then + echo "blocklist is truncated" +else + echo "Error: blocklist is not truncated" + exit 1 +fi diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 9d7cf9217a..f5f1c461f7 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -25,7 +25,7 @@ groups=( ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table br_region_rule' - ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index' + ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index br_pitr_log_restore_backup_compatibility' ["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption br_pitr_online_table_filter' ["G07"]='br_pitr br_restore_physical' ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict br_pitr_table_filter' diff --git a/errors.toml b/errors.toml index ee74ee9b4f..ed58a83f08 100644 --- a/errors.toml +++ b/errors.toml @@ -316,6 +316,11 @@ error = ''' stream task already exists ''' +["BR:Stream:ErrStreamLogTaskHasNoStorage"] +error = ''' +stream task has no storage +''' + ["Lighting:Restore:ErrChecksumMismatch"] error = ''' checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)