br: compatibility of log backup and log restore (#61238)

close pingcap/tidb#61237
This commit is contained in:
Jianjun Liao
2025-05-28 02:46:58 +08:00
committed by GitHub
parent ed869351f9
commit b871e0281e
10 changed files with 617 additions and 20 deletions

View File

@ -75,7 +75,8 @@ var (
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))
// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
ErrStreamLogTaskHasNoStorage = errors.Normalize("stream task has no storage", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskHasNoStorage"))
// TODO maybe it belongs to PiTR.
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))

View File

@ -18,6 +18,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
@ -28,6 +29,7 @@ go_library(
"//pkg/parser/ast",
"//pkg/util",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
@ -50,19 +52,22 @@ go_test(
name = "restore_test",
timeout = "short",
srcs = [
"export_test.go",
"import_mode_switcher_test.go",
"misc_test.go",
"restorer_test.go",
],
embed = [":restore"],
flaky = True,
shard_count = 13,
shard_count = 17,
deps = [
":restore",
"//br/pkg/conn",
"//br/pkg/mock",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//pkg/kv",
"//pkg/parser/ast",

View File

@ -0,0 +1,21 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package restore
var (
LogRestoreTableIDBlocklistFilePrefix = logRestoreTableIDBlocklistFilePrefix
ParseLogRestoreTableIDsBlocklistFileName = parseLogRestoreTableIDsBlocklistFileName
UnmarshalLogRestoreTableIDsBlocklistFile = unmarshalLogRestoreTableIDsBlocklistFile
)

View File

@ -15,15 +15,23 @@
package restore
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"fmt"
"path"
"strconv"
"strings"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
@ -34,6 +42,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// deprecated parameter
@ -44,6 +53,192 @@ const (
CoarseGrained Granularity = "coarse-grained"
)
const logRestoreTableIDBlocklistFilePrefix = "v1/log_restore_tables_blocklists"
type LogRestoreTableIDsBlocklistFile struct {
// RestoreCommitTs records the timestamp after PITR restore done. Only the later PITR restore from the log backup of the cluster,
// whose BackupTS is not less than it, can ignore the restore table IDs blocklist recorded in the file.
RestoreCommitTs uint64 `protobuf:"varint,1,opt,name=restore_commit_ts,proto3"`
// SnapshotBackupTs records the BackupTS of the PITR restore. Any PITR restore from the log backup of the cluster, whose restoredTS
// is less than it, can ignore the restore table IDs blocklist recorded in the file.
SnapshotBackupTs uint64 `protobuf:"varint,2,opt,name=snapshot_backup_ts,proto3"`
// TableIDs records the table IDs blocklist of the cluster running the log backup task.
TableIds []int64 `protobuf:"varint,3,rep,packed,name=table_ids,proto3"`
// Checksum records the checksum of other fields.
Checksum []byte `protobuf:"bytes,4,opt,name=checksum,proto3"`
}
func (m *LogRestoreTableIDsBlocklistFile) Reset() { *m = LogRestoreTableIDsBlocklistFile{} }
func (m *LogRestoreTableIDsBlocklistFile) String() string { return proto.CompactTextString(m) }
func (m *LogRestoreTableIDsBlocklistFile) ProtoMessage() {}
func (m *LogRestoreTableIDsBlocklistFile) filename() string {
return fmt.Sprintf("%s/R%016X_S%016X.meta", logRestoreTableIDBlocklistFilePrefix, m.RestoreCommitTs, m.SnapshotBackupTs)
}
func parseLogRestoreTableIDsBlocklistFileName(filename string) (restoreCommitTs, snapshotBackupTs uint64, parsed bool) {
filename = path.Base(filename)
if !strings.HasSuffix(filename, ".meta") {
return 0, 0, false
}
if filename[0] != 'R' {
return 0, 0, false
}
ts, err := strconv.ParseUint(filename[1:17], 16, 64)
if err != nil {
log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err))
return 0, 0, false
}
restoreCommitTs = ts
if filename[17] != '_' || filename[18] != 'S' {
return 0, 0, false
}
ts, err = strconv.ParseUint(filename[19:35], 16, 64)
if err != nil {
log.Warn("failed to parse log restore table IDs blocklist file name", zap.String("filename", filename), zap.Error(err))
return 0, 0, false
}
snapshotBackupTs = ts
return restoreCommitTs, snapshotBackupTs, true
}
func (m *LogRestoreTableIDsBlocklistFile) checksumLogRestoreTableIDsBlocklistFile() []byte {
hasher := sha256.New()
hasher.Write(binary.LittleEndian.AppendUint64(nil, m.RestoreCommitTs))
hasher.Write(binary.LittleEndian.AppendUint64(nil, m.SnapshotBackupTs))
for _, tableId := range m.TableIds {
hasher.Write(binary.LittleEndian.AppendUint64(nil, uint64(tableId)))
}
return hasher.Sum(nil)
}
func (m *LogRestoreTableIDsBlocklistFile) setChecksumLogRestoreTableIDsBlocklistFile() {
m.Checksum = m.checksumLogRestoreTableIDsBlocklistFile()
}
// MarshalLogRestoreTableIDsBlocklistFile generates an Blocklist file and marshals it. It returns its filename and the marshaled data.
func MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, snapshotBackupTs uint64, tableIds []int64) (string, []byte, error) {
blocklistFile := &LogRestoreTableIDsBlocklistFile{
RestoreCommitTs: restoreCommitTs,
SnapshotBackupTs: snapshotBackupTs,
TableIds: tableIds,
}
blocklistFile.setChecksumLogRestoreTableIDsBlocklistFile()
filename := blocklistFile.filename()
data, err := proto.Marshal(blocklistFile)
if err != nil {
return "", nil, errors.Trace(err)
}
return filename, data, nil
}
// unmarshalLogRestoreTableIDsBlocklistFile unmarshals the given blocklist file.
func unmarshalLogRestoreTableIDsBlocklistFile(data []byte) (restoreCommitTs, snapshotBackupTs uint64, tableIds []int64, err error) {
blocklistFile := &LogRestoreTableIDsBlocklistFile{}
if err = proto.Unmarshal(data, blocklistFile); err != nil {
return 0, 0, nil, errors.Trace(err)
}
if !bytes.Equal(blocklistFile.checksumLogRestoreTableIDsBlocklistFile(), blocklistFile.Checksum) {
return 0, 0, nil, errors.Errorf(
"checksum mismatch (calculated checksum is %s but the recorded checksum is %s), the log restore table IDs blocklist file may be corrupted",
base64.StdEncoding.EncodeToString(blocklistFile.checksumLogRestoreTableIDsBlocklistFile()),
base64.StdEncoding.EncodeToString(blocklistFile.Checksum),
)
}
return blocklistFile.RestoreCommitTs, blocklistFile.SnapshotBackupTs, blocklistFile.TableIds, nil
}
func fastWalkLogRestoreTableIDsBlocklistFile(
ctx context.Context,
s storage.ExternalStorage,
filterOutFn func(restoreCommitTs, snapshotBackupTs uint64) bool,
executionFn func(ctx context.Context, filename string, restoreCommitTs uint64, tableIds []int64) error,
) error {
filenames := make([]string, 0)
if err := s.WalkDir(ctx, &storage.WalkOption{SubDir: logRestoreTableIDBlocklistFilePrefix}, func(path string, _ int64) error {
restoreCommitTs, snapshotBackupTs, parsed := parseLogRestoreTableIDsBlocklistFileName(path)
if parsed {
if filterOutFn(restoreCommitTs, snapshotBackupTs) {
return nil
}
}
filenames = append(filenames, path)
return nil
}); err != nil {
return errors.Trace(err)
}
workerpool := tidbutil.NewWorkerPool(8, "walk dir log restore table IDs blocklist files")
eg, ectx := errgroup.WithContext(ctx)
for _, filename := range filenames {
if ectx.Err() != nil {
break
}
workerpool.ApplyOnErrorGroup(eg, func() error {
data, err := s.ReadFile(ectx, filename)
if err != nil {
return errors.Trace(err)
}
restoreCommitTs, snapshotBackupTs, tableIds, err := unmarshalLogRestoreTableIDsBlocklistFile(data)
if err != nil {
return errors.Trace(err)
}
if filterOutFn(restoreCommitTs, snapshotBackupTs) {
return nil
}
err = executionFn(ectx, filename, restoreCommitTs, tableIds)
return errors.Trace(err)
})
}
return errors.Trace(eg.Wait())
}
// CheckTableTrackerContainsTableIDsFromBlocklistFiles checks whether pitr id tracker contains the filtered table IDs from blocklist file.
func CheckTableTrackerContainsTableIDsFromBlocklistFiles(
ctx context.Context,
s storage.ExternalStorage,
tracker *utils.PiTRIdTracker,
startTs, restoredTs uint64,
tableNameByTableID func(tableID int64) string,
checkTableIDLost func(tableId int64) bool,
) error {
err := fastWalkLogRestoreTableIDsBlocklistFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool {
return startTs >= restoreCommitTs || restoredTs <= snapshotBackupTs
}, func(_ context.Context, _ string, restoreCommitTs uint64, tableIds []int64) error {
for _, tableId := range tableIds {
if tracker.ContainsTableId(tableId) || tracker.ContainsPartitionId(tableId) {
return errors.Errorf(
"cannot restore the table(Id=%d, name=%s at %d) because it is log restored(at %d) before snapshot backup(at %d). "+
"Please respecify the filter that does not contain the table or replace with a newer snapshot backup.",
tableId, tableNameByTableID(tableId), restoredTs, restoreCommitTs, startTs)
}
// the meta kv may not be backed by log restore
if checkTableIDLost(tableId) {
return errors.Errorf(
"cannot restore the table(Id=%d) because it is log restored(at %d) before snapshot backup(at %d). "+
"Please respecify the filter that does not contain the table or replace with a newer snapshot backup.",
tableId, restoreCommitTs, startTs,
)
}
}
return nil
})
return errors.Trace(err)
}
// TruncateLogRestoreTableIDsBlocklistFiles truncates the blocklist files whose restore commit ts is not larger than truncate until ts.
func TruncateLogRestoreTableIDsBlocklistFiles(
ctx context.Context,
s storage.ExternalStorage,
untilTs uint64,
) error {
err := fastWalkLogRestoreTableIDsBlocklistFile(ctx, s, func(restoreCommitTs, snapshotBackupTs uint64) bool {
return untilTs < restoreCommitTs
}, func(ctx context.Context, filename string, _ uint64, _ []int64) error {
return s.DeleteFile(ctx, filename)
})
return errors.Trace(err)
}
type UniqueTableName struct {
DB string
Table string

View File

@ -24,6 +24,8 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/session"
@ -130,3 +132,134 @@ func TestGetTSWithRetry(t *testing.T) {
require.NoError(t, err)
})
}
func TestParseLogRestoreTableIDsBlocklistFileName(t *testing.T) {
restoreCommitTs, snapshotBackupTs, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName("RFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.meta")
require.True(t, parsed)
require.Equal(t, uint64(0xFFFFFFFFFFFFFFFF), restoreCommitTs)
require.Equal(t, uint64(0xFFFFFFFFFFFFFFFF), snapshotBackupTs)
unparsedFilenames := []string{
"KFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.meta",
"RFFFFFFFFFFFFFFFF.SFFFFFFFFFFFFFFFF.meta",
"RFFFFFFFFFFFFFFFF_KFFFFFFFFFFFFFFFF.meta",
"RFFFFFFFFFFFFFFFF_SFFFFFFFFFFFFFFFF.mata",
"RFFFFFFFKFFFFFFFF_SFFFFFFFFFFFFFFFF.meta",
"RFFFFFFFFFFFFFFFF_SFFFFFFFFKFFFFFFF.meta",
}
for _, filename := range unparsedFilenames {
_, _, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName(filename)
require.False(t, parsed)
}
}
func TestLogRestoreTableIDsBlocklistFile(t *testing.T) {
ctx := context.Background()
base := t.TempDir()
stg, err := storage.NewLocalStorage(base)
require.NoError(t, err)
name, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(0xFFFFFCDEFFFFF, 0xFFFFFFABCFFFF, []int64{1, 2, 3})
require.NoError(t, err)
restoreCommitTs, snapshotBackupTs, parsed := restore.ParseLogRestoreTableIDsBlocklistFileName(name)
require.True(t, parsed)
require.Equal(t, uint64(0xFFFFFCDEFFFFF), restoreCommitTs)
require.Equal(t, uint64(0xFFFFFFABCFFFF), snapshotBackupTs)
err = stg.WriteFile(ctx, name, data)
require.NoError(t, err)
data, err = stg.ReadFile(ctx, name)
require.NoError(t, err)
restoreCommitTs, snapshotBackupTs, tableIds, err := restore.UnmarshalLogRestoreTableIDsBlocklistFile(data)
require.NoError(t, err)
require.Equal(t, uint64(0xFFFFFCDEFFFFF), restoreCommitTs)
require.Equal(t, uint64(0xFFFFFFABCFFFF), snapshotBackupTs)
require.Equal(t, []int64{1, 2, 3}, tableIds)
}
func writeBlocklistFile(
ctx context.Context, t *testing.T, s storage.ExternalStorage,
restoreCommitTs, snapshotBackupTs uint64, tableIds []int64,
) {
name, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, snapshotBackupTs, tableIds)
require.NoError(t, err)
err = s.WriteFile(ctx, name, data)
require.NoError(t, err)
}
func fakeTrackerID(tableIds []int64) *utils.PiTRIdTracker {
tracker := utils.NewPiTRIdTracker()
for _, tableId := range tableIds {
tracker.TableIdToDBIds[tableId] = make(map[int64]struct{})
}
return tracker
}
func TestCheckTableTrackerContainsTableIDsFromBlocklistFiles(t *testing.T) {
ctx := context.Background()
base := t.TempDir()
stg, err := storage.NewLocalStorage(base)
require.NoError(t, err)
writeBlocklistFile(ctx, t, stg, 100, 10, []int64{100, 101, 102})
writeBlocklistFile(ctx, t, stg, 200, 20, []int64{200, 201, 202})
writeBlocklistFile(ctx, t, stg, 300, 30, []int64{300, 301, 302})
tableNameByTableID := func(tableID int64) string {
return fmt.Sprintf("table_%d", tableID)
}
checkTableIDLost := func(tableId int64) bool {
return false
}
checkTableIDLost2 := func(tableId int64) bool {
return true
}
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 250, 300, tableNameByTableID, checkTableIDLost)
require.Error(t, err)
require.Contains(t, err.Error(), "table_300")
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 250, 300, tableNameByTableID, checkTableIDLost)
require.NoError(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 250, 300, tableNameByTableID, checkTableIDLost2)
require.Error(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 250, 300, tableNameByTableID, checkTableIDLost)
require.NoError(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 250, 300, tableNameByTableID, checkTableIDLost2)
require.Error(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 1, 25, tableNameByTableID, checkTableIDLost)
require.NoError(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{300, 301, 302}), 1, 25, tableNameByTableID, checkTableIDLost2)
require.Error(t, err)
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{200, 201, 202}), 1, 25, tableNameByTableID, checkTableIDLost)
require.Error(t, err)
require.Contains(t, err.Error(), "table_200")
err = restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(ctx, stg, fakeTrackerID([]int64{100, 101, 102}), 1, 25, tableNameByTableID, checkTableIDLost)
require.Error(t, err)
require.Contains(t, err.Error(), "table_100")
}
func filesCount(ctx context.Context, s storage.ExternalStorage) int {
count := 0
s.WalkDir(ctx, &storage.WalkOption{SubDir: restore.LogRestoreTableIDBlocklistFilePrefix}, func(path string, size int64) error {
count += 1
return nil
})
return count
}
func TestTruncateLogRestoreTableIDsBlocklistFiles(t *testing.T) {
ctx := context.Background()
base := t.TempDir()
stg, err := storage.NewLocalStorage(base)
require.NoError(t, err)
writeBlocklistFile(ctx, t, stg, 100, 10, []int64{100, 101, 102})
writeBlocklistFile(ctx, t, stg, 200, 20, []int64{200, 201, 202})
writeBlocklistFile(ctx, t, stg, 300, 30, []int64{300, 301, 302})
err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 50)
require.NoError(t, err)
require.Equal(t, 3, filesCount(ctx, stg))
err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 250)
require.NoError(t, err)
require.Equal(t, 1, filesCount(ctx, stg))
err = restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, stg, 350)
require.NoError(t, err)
require.Equal(t, 0, filesCount(ctx, stg))
}

View File

@ -34,6 +34,7 @@ import (
snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
@ -785,7 +786,7 @@ func printRestoreMetrics() {
}
// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) (restoreErr error) {
etcdCLI, err := dialEtcdWithCfg(c, cfg.Config)
if err != nil {
return err
@ -795,7 +796,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
log.Error("failed to close the etcd client", zap.Error(err))
}
}()
if err := checkConflictingLogBackup(c, cfg, etcdCLI); err != nil {
logTaskBackend, err := checkConflictingLogBackup(c, cfg, IsStreamRestore(cmdName), etcdCLI)
if err != nil {
return errors.Annotate(err, "failed to check task exists")
}
closeF, err := registerTaskToPD(c, etcdCLI)
@ -817,6 +819,38 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}
defer mgr.Close()
defer cfg.CloseCheckpointMetaManager()
defer func() {
if logTaskBackend == nil || restoreErr != nil || cfg.PiTRTableTracker == nil {
return
}
restoreCommitTs, err := restore.GetTSWithRetry(c, mgr.GetPDClient())
if err != nil {
restoreErr = err
return
}
tableIds := make([]int64, 0, len(cfg.PiTRTableTracker.TableIdToDBIds))
for tableId := range cfg.PiTRTableTracker.TableIdToDBIds {
tableIds = append(tableIds, tableId)
}
for tableId := range cfg.PiTRTableTracker.PartitionIds {
tableIds = append(tableIds, tableId)
}
filename, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, cfg.StartTS, tableIds)
if err != nil {
restoreErr = err
return
}
logTaskStorage, err := storage.Create(c, logTaskBackend, false)
if err != nil {
restoreErr = err
return
}
log.Info("save the log restore table IDs blocklist into log backup storage")
if err = logTaskStorage.WriteFile(c, filename, data); err != nil {
restoreErr = err
return
}
}()
if err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
enableFollowerHandleRegion, err := se.GetGlobalSysVar(vardef.PDEnableFollowerHandleRegion)
@ -830,12 +864,11 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
defer printRestoreMetrics()
var restoreError error
if IsStreamRestore(cmdName) {
if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil {
return errors.Trace(err)
}
restoreError = RunStreamRestore(c, mgr, g, cfg)
restoreErr = RunStreamRestore(c, mgr, g, cfg)
} else {
if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBR); err != nil {
return errors.Trace(err)
@ -843,10 +876,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
snapshotRestoreConfig := SnapshotRestoreConfig{
RestoreConfig: cfg,
}
restoreError = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig)
restoreErr = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig)
}
if restoreError != nil {
return errors.Trace(restoreError)
if restoreErr != nil {
return errors.Trace(restoreErr)
}
// Clear the checkpoint data
if cfg.UseCheckpoint {
@ -883,6 +916,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
type SnapshotRestoreConfig struct {
*RestoreConfig
piTRTaskInfo *PiTRTaskInfo
logRestoreStorage storage.ExternalStorage
logTableHistoryManager *stream.LogBackupTableHistoryManager
tableMappingManager *stream.TableMappingManager
}
@ -1066,13 +1100,14 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
// only run when this full restore is part of the PiTR
if isPiTR {
snapshotTableMap := client.GetTableMap()
// adjust tables to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into or out of the filter range.
err = AdjustTablesToRestoreAndCreateTableTracker(
cfg.logTableHistoryManager,
cfg.RestoreConfig,
client.GetDatabaseMap(),
client.GetTableMap(),
snapshotTableMap,
client.GetPartitionMap(),
tableMap,
dbMap,
@ -1084,6 +1119,46 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
log.Info("adjusted items to restore",
zap.Int("tables", len(tableMap)),
zap.Int("db", len(dbMap)))
tableNameByTableID := func(tableID int64) string {
dbName, tableName, dbID := "", "", int64(0)
history := cfg.logTableHistoryManager.GetTableHistory()
if locations, exists := history[tableID]; exists {
if name, exists := cfg.logTableHistoryManager.GetDBNameByID(locations[1].DbID); exists {
dbName = name
}
dbID = locations[1].DbID
tableName = locations[1].TableName
} else if tableMeta, exists := tableMap[tableID]; exists && tableMeta != nil && tableMeta.Info != nil {
if tableMeta.DB != nil && len(dbName) == 0 {
dbName = tableMeta.DB.Name.O
}
tableName = tableMeta.Info.Name.O
}
if len(dbName) == 0 && dbID > 0 {
if dbInfo, exists := dbMap[dbID]; exists {
dbName = dbInfo.Info.Name.O
}
}
return fmt.Sprintf("%s.%s", dbName, tableName)
}
checkTableIDLost := func(tableId int64) bool {
// check whether exists in log backup
if _, exists := cfg.logTableHistoryManager.GetTableHistory()[tableId]; exists {
return false
}
// check whether exists in snapshot backup
if _, exists := snapshotTableMap[tableId]; exists {
return false
}
return true
}
if err := restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(
ctx, cfg.logRestoreStorage, cfg.PiTRTableTracker, backupMeta.GetEndVersion(), cfg.piTRTaskInfo.RestoreTS,
tableNameByTableID, checkTableIDLost,
); err != nil {
return errors.Trace(err)
}
}
tables := utils.Values(tableMap)
dbs := utils.Values(dbMap)

View File

@ -1195,6 +1195,19 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
}
}
// begin to remove log restore table IDs blocklist files
removeLogRestoreTableIDsMarkerFilesDone := console.ShowTask("Removing log restore table IDs blocklist files...", glue.WithTimeCost())
defer func() {
if removeLogRestoreTableIDsMarkerFilesDone != nil {
removeLogRestoreTableIDsMarkerFilesDone()
}
}()
if err := restore.TruncateLogRestoreTableIDsBlocklistFiles(ctx, extStorage, cfg.Until); err != nil {
return errors.Trace(err)
}
removeLogRestoreTableIDsMarkerFilesDone()
removeLogRestoreTableIDsMarkerFilesDone = nil
// begin to remove
p := console.StartProgressBar(
"Truncating Data Files and Metadata", fileCount,
@ -1232,24 +1245,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
// checkConflictingLogBackup checks whether there is a log backup task running.
// If so, return an error.
func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error {
// If the execution is PITR restore, returns the external storage backend of taskInfos to record log restore table ids marker.
func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, streamRestore bool, etcdCLI *clientv3.Client) (*backuppb.StorageBackend, error) {
if err := checkConfigForStatus(cfg.PD); err != nil {
return err
return nil, err
}
cli := streamhelper.NewMetaDataClient(etcdCLI)
// check log backup task
tasks, err := cli.GetAllTasks(ctx)
if err != nil {
return err
return nil, err
}
if streamRestore && len(tasks) > 0 {
if tasks[0].Info.Storage == nil {
return nil, errors.Annotatef(berrors.ErrStreamLogTaskHasNoStorage,
"cannot save log restore table IDs blocklist file because the external storage backend of the task[%s] is empty", tasks[0].Info.Name)
}
return tasks[0].Info.Storage, nil
}
for _, task := range tasks {
if err := checkTaskCompat(cfg, task); err != nil {
return err
return nil, err
}
}
return nil
return nil, nil
}
func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error {
@ -1257,9 +1278,6 @@ func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error {
"You may check the extra information to get rid of this. If that doesn't work, you may "+
"stop the task before restore, and after the restore operation finished, "+
"create log-backup task again and create a full backup on this cluster.", task.Info.Name)
if len(cfg.FullBackupStorage) > 0 {
return errors.Annotate(baseErr, "you want to do point in time restore, which isn't compatible with an enabled log backup task yet")
}
if !cfg.UserFiltered() {
return errors.Annotate(baseErr,
"you want to restore a whole cluster, you may use `-f` or `restore table|database` to "+
@ -1395,6 +1413,7 @@ func RunStreamRestore(
snapshotRestoreConfig := SnapshotRestoreConfig{
RestoreConfig: cfg,
piTRTaskInfo: taskInfo,
logRestoreStorage: s,
logTableHistoryManager: metaInfoProcessor.GetTableHistoryManager(),
tableMappingManager: metaInfoProcessor.GetTableMappingManager(),
}

View File

@ -0,0 +1,143 @@
#!/bin/bash
#
# Copyright 2025 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)
# const value
PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
TASK_NAME="br_pitr_log_restore_backup_compatibility"
restart_services
# prepare the data
run_sql "create database if not exists test"
run_sql "create table test.t1 (id int)"
run_sql "insert into test.t1 values (1), (10), (100)"
# start the log backup task
echo "start log task"
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$PREFIX/log"
# run snapshot backup
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full"
# prepare the incremental data
run_sql "create table test.t2(id int)"
run_sql "insert into test.t1 values (11), (111)"
run_sql "insert into test.t2 values (2), (20), (200)"
# get the checkpoint ts
sleep 5
ok_restored_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
sleep 5
## prepare another log restore
# prepare the data
run_sql "create table test.t3(id int)"
run_sql "insert into test.t3 values (3), (30), (300)"
# run snapshot backup
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full2"
# prepare the incremental data
run_sql "insert into test.t3 values (33), (333)"
# wait checkpoint advance
sleep 5
restored_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME
# clean the table test.t3
run_sql "drop table test.t3"
# run PITR restore
echo "run PITR restore"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full2" --filter test.t3 --restored-ts $restored_ts
# check the blocklist file
if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then
echo "Error: no blocklist is saved"
exit 1
fi
sleep 5
truncate_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
# prepare the data
run_sql "create table test.t4(id int)"
run_sql "insert into test.t4 values (4), (40), (400)"
# snapshot backup
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full3"
# prepare the incremental data
run_sql "insert into test.t4 values (44), (444)"
# wait checkpoint advance
sleep 5
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME
## test log restore with block list
# pass because restored ts is less than BackupTS of snapshot backup 2
restart_services
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" --restored-ts $ok_restored_ts
run_sql "select sum(id) as SUM from test.t1"
check_contains "SUM: 233"
# pass because backup ts is larger than restore commit ts
restart_services
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full3"
run_sql "select sum(id) as SUM from test.t1"
check_contains "SUM: 233"
run_sql "select sum(id) as SUM from test.t2"
check_contains "SUM: 222"
run_sql "select sum(id) as SUM from test.t3"
check_contains "SUM: 699"
run_sql "select sum(id) as SUM from test.t4"
check_contains "SUM: 932"
# otherwise, failed
restart_services
success=true
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 || success=false
if $success; then
echo "Error: PITR restore must be failed"
exit 1
fi
check_contains "cannot restore the table"
# truncate the blocklist
run_br log truncate -s "local://$TEST_DIR/$PREFIX/log" --until $ok_restored_ts -y
if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then
echo "Error: blocklist is truncated"
exit 1
fi
run_br log truncate -s "local://$TEST_DIR/$PREFIX/log" --until $truncate_ts -y
if [ -z "$(ls -A $TEST_DIR/$PREFIX/log/v1/log_restore_tables_blocklists)" ]; then
echo "blocklist is truncated"
else
echo "Error: blocklist is not truncated"
exit 1
fi

View File

@ -25,7 +25,7 @@ groups=(
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table br_region_rule'
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index'
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index br_pitr_log_restore_backup_compatibility'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption br_pitr_online_table_filter'
["G07"]='br_pitr br_restore_physical'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict br_pitr_table_filter'

View File

@ -316,6 +316,11 @@ error = '''
stream task already exists
'''
["BR:Stream:ErrStreamLogTaskHasNoStorage"]
error = '''
stream task has no storage
'''
["Lighting:Restore:ErrChecksumMismatch"]
error = '''
checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)