br: add tool to clean up registry and checkpoints (#61977)
close pingcap/tidb#61964, close pingcap/tidb#62011
This commit is contained in:
@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "br_lib",
|
||||
srcs = [
|
||||
"abort.go",
|
||||
"backup.go",
|
||||
"cmd.go",
|
||||
"debug.go",
|
||||
|
||||
141
br/cmd/br/abort.go
Normal file
141
br/cmd/br/abort.go
Normal file
@ -0,0 +1,141 @@
|
||||
// Copyright 2025 PingCAP, Inc. Licensed under Apache-2.0.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/br/pkg/task"
|
||||
"github.com/pingcap/tidb/br/pkg/trace"
|
||||
"github.com/pingcap/tidb/br/pkg/version/build"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
"sourcegraph.com/sourcegraph/appdash"
|
||||
)
|
||||
|
||||
// NewAbortCommand returns an abort subcommand
|
||||
func NewAbortCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "abort",
|
||||
Short: "abort restore tasks",
|
||||
SilenceUsage: true,
|
||||
PersistentPreRunE: func(c *cobra.Command, args []string) error {
|
||||
if err := Init(c); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
build.LogInfo(build.BR)
|
||||
logutil.LogEnvVariables()
|
||||
task.LogArguments(c)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
command.AddCommand(
|
||||
newAbortRestoreCommand(),
|
||||
// future: newAbortBackupCommand(),
|
||||
)
|
||||
task.DefineRestoreFlags(command.PersistentFlags())
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
// newAbortRestoreCommand returns an abort restore subcommand
|
||||
func newAbortRestoreCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "restore",
|
||||
Short: "abort restore tasks",
|
||||
SilenceUsage: true,
|
||||
}
|
||||
|
||||
command.AddCommand(
|
||||
newAbortRestoreFullCommand(),
|
||||
newAbortRestoreDBCommand(),
|
||||
newAbortRestoreTableCommand(),
|
||||
newAbortRestorePointCommand(),
|
||||
)
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
func newAbortRestoreFullCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "full",
|
||||
Short: "abort a full restore task",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runAbortRestoreCommand(cmd, task.FullRestoreCmd)
|
||||
},
|
||||
}
|
||||
// define flags specific to full restore
|
||||
task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, false)
|
||||
task.DefineRestoreSnapshotFlags(command)
|
||||
return command
|
||||
}
|
||||
|
||||
func newAbortRestoreDBCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "db",
|
||||
Short: "abort a database restore task",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runAbortRestoreCommand(cmd, task.DBRestoreCmd)
|
||||
},
|
||||
}
|
||||
task.DefineDatabaseFlags(command)
|
||||
return command
|
||||
}
|
||||
|
||||
func newAbortRestoreTableCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "table",
|
||||
Short: "abort a table restore task",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runAbortRestoreCommand(cmd, task.TableRestoreCmd)
|
||||
},
|
||||
}
|
||||
task.DefineTableFlags(command)
|
||||
return command
|
||||
}
|
||||
|
||||
func newAbortRestorePointCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "point",
|
||||
Short: "abort a point-in-time restore task",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runAbortRestoreCommand(cmd, task.PointRestoreCmd)
|
||||
},
|
||||
}
|
||||
task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, true)
|
||||
task.DefineStreamRestoreFlags(command)
|
||||
return command
|
||||
}
|
||||
|
||||
func runAbortRestoreCommand(command *cobra.Command, cmdName string) error {
|
||||
cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
|
||||
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
|
||||
command.SilenceUsage = false
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if task.IsStreamRestore(cmdName) {
|
||||
if err := cfg.ParseStreamRestoreFlags(command.Flags()); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
ctx := GetDefaultContext()
|
||||
if cfg.EnableOpenTracing {
|
||||
var store *appdash.MemoryStore
|
||||
ctx, store = trace.TracerStartSpan(ctx)
|
||||
defer trace.TracerFinishSpan(ctx, store)
|
||||
}
|
||||
|
||||
if err := task.RunRestoreAbort(ctx, tidbGlue, cmdName, &cfg); err != nil {
|
||||
log.Error("failed to abort restore task", zap.Error(err))
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -29,6 +29,7 @@ func main() {
|
||||
NewRestoreCommand(),
|
||||
NewStreamCommand(),
|
||||
newOperatorCommand(),
|
||||
NewAbortCommand(),
|
||||
)
|
||||
// Outputs cmd.Print to stdout.
|
||||
rootCmd.SetOut(os.Stdout)
|
||||
|
||||
@ -66,6 +66,13 @@ const (
|
||||
SET status = %%?
|
||||
WHERE id = %%? AND status = %%?`
|
||||
|
||||
// updateStatusFromMultipleSQLTemplate is the SQL template for updating a task's status
|
||||
// when the current status can be one of multiple values
|
||||
updateStatusFromMultipleSQLTemplate = `
|
||||
UPDATE %s.%s
|
||||
SET status = %%?
|
||||
WHERE id = %%? AND status IN (%s)`
|
||||
|
||||
// resumeTaskByIDSQLTemplate is the SQL template for resuming a paused task by its ID
|
||||
resumeTaskByIDSQLTemplate = `
|
||||
UPDATE %s.%s
|
||||
@ -410,20 +417,38 @@ func (r *Registry) collectResettingStatusTasks(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateTaskStatusConditional updates a task's status only if its current status matches the expected status
|
||||
func (r *Registry) updateTaskStatusConditional(ctx context.Context, restoreID uint64, currentStatus,
|
||||
// updateTaskStatusFromMultiple updates a task's status only if its current status matches one of the expected statuses
|
||||
func (r *Registry) updateTaskStatusFromMultiple(ctx context.Context, restoreID uint64, currentStatuses []TaskStatus,
|
||||
newStatus TaskStatus) error {
|
||||
log.Info("attempting to update task status",
|
||||
if len(currentStatuses) == 0 {
|
||||
return errors.New("currentStatuses cannot be empty")
|
||||
}
|
||||
|
||||
// build the status list for the IN clause
|
||||
statusList := make([]string, len(currentStatuses))
|
||||
for i, status := range currentStatuses {
|
||||
statusList[i] = fmt.Sprintf("'%s'", string(status))
|
||||
}
|
||||
statusInClause := strings.Join(statusList, ", ")
|
||||
|
||||
log.Info("attempting to update task status from multiple possible statuses",
|
||||
zap.Uint64("restore_id", restoreID),
|
||||
zap.String("current_status", string(currentStatus)),
|
||||
zap.Strings("current_statuses", func() []string {
|
||||
result := make([]string, len(currentStatuses))
|
||||
for i, s := range currentStatuses {
|
||||
result[i] = string(s)
|
||||
}
|
||||
return result
|
||||
}()),
|
||||
zap.String("new_status", string(newStatus)))
|
||||
|
||||
// use where to update only when status is what we want
|
||||
updateSQL := fmt.Sprintf(updateStatusSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
|
||||
// use where to update only when status is one of the expected values
|
||||
updateSQL := fmt.Sprintf(updateStatusFromMultipleSQLTemplate,
|
||||
RestoreRegistryDBName, RestoreRegistryTableName, statusInClause)
|
||||
|
||||
if err := r.se.ExecuteInternal(ctx, updateSQL, newStatus, restoreID, currentStatus); err != nil {
|
||||
return errors.Annotatef(err, "failed to conditionally update task status from %s to %s",
|
||||
currentStatus, newStatus)
|
||||
if err := r.se.ExecuteInternal(ctx, updateSQL, newStatus, restoreID); err != nil {
|
||||
return errors.Annotatef(err, "failed to conditionally update task status from %v to %s",
|
||||
currentStatuses, newStatus)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -443,11 +468,12 @@ func (r *Registry) Unregister(ctx context.Context, restoreID uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PauseTask marks a task as paused only if it's currently running
|
||||
// PauseTask marks a task as paused only if it's currently running or resetting
|
||||
func (r *Registry) PauseTask(ctx context.Context, restoreID uint64) error {
|
||||
// first stop heartbeat manager
|
||||
r.StopHeartbeatManager()
|
||||
return r.updateTaskStatusConditional(ctx, restoreID, TaskStatusRunning, TaskStatusPaused)
|
||||
return r.updateTaskStatusFromMultiple(ctx, restoreID,
|
||||
[]TaskStatus{TaskStatusRunning, TaskStatusResetting}, TaskStatusPaused)
|
||||
}
|
||||
|
||||
// GetRegistrationsByMaxID returns all registrations with IDs smaller than maxID
|
||||
@ -900,3 +926,144 @@ func (r *Registry) GlobalOperationAfterSetResettingStatus(ctx context.Context,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindAndDeleteMatchingTask finds and deletes the registry entry that matches the given restore configuration
|
||||
// This is used for the abort functionality to clean up the matching task
|
||||
// Similar to ResumeOrCreateRegistration, it first resolves the restoredTS then finds and deletes the matching
|
||||
// paused task
|
||||
// Returns the deleted task ID, or 0 if no matching task was found
|
||||
func (r *Registry) FindAndDeleteMatchingTask(ctx context.Context,
|
||||
info RegistrationInfo, isRestoredTSUserSpecified bool) (uint64, error) {
|
||||
// resolve which restoredTS to use
|
||||
resolvedRestoreTS, err := r.resolveRestoreTS(ctx, info, isRestoredTSUserSpecified)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// update info with resolved restoredTS if different
|
||||
if resolvedRestoreTS != info.RestoredTS {
|
||||
log.Info("using resolved restoredTS for abort operation",
|
||||
zap.Uint64("original_restored_ts", info.RestoredTS),
|
||||
zap.Uint64("resolved_restored_ts", resolvedRestoreTS))
|
||||
info.RestoredTS = resolvedRestoreTS
|
||||
}
|
||||
|
||||
filterStrings := strings.Join(info.FilterStrings, FilterSeparator)
|
||||
|
||||
log.Info("searching for matching task to delete",
|
||||
zap.String("filter_strings", filterStrings),
|
||||
zap.Uint64("start_ts", info.StartTS),
|
||||
zap.Uint64("restored_ts", info.RestoredTS),
|
||||
zap.Uint64("upstream_cluster_id", info.UpstreamClusterID),
|
||||
zap.Bool("with_sys_table", info.WithSysTable),
|
||||
zap.String("cmd", info.Cmd))
|
||||
|
||||
var deletedTaskID uint64
|
||||
|
||||
err = r.executeInTransaction(ctx, func(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor,
|
||||
sessionOpts []sqlexec.OptionFuncAlias) error {
|
||||
// find and lock the task that matches the configuration
|
||||
lookupSQL := fmt.Sprintf(lookupRegistrationSQLTemplate,
|
||||
RestoreRegistryDBName, RestoreRegistryTableName)
|
||||
rows, _, err := execCtx.ExecRestrictedSQL(ctx, sessionOpts, lookupSQL,
|
||||
filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd)
|
||||
if err != nil {
|
||||
return errors.Annotate(err, "failed to lookup matching task")
|
||||
}
|
||||
|
||||
if len(rows) == 0 {
|
||||
log.Info("no matching task found to delete")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(rows) > 1 {
|
||||
log.Error("multiple matching tasks found, this is unexpected and indicates a bug",
|
||||
zap.Int("count", len(rows)))
|
||||
return errors.Annotatef(berrors.ErrInvalidArgument,
|
||||
"found %d matching tasks, expected exactly 1", len(rows))
|
||||
}
|
||||
|
||||
// get the single matching task (now locked)
|
||||
taskID := rows[0].GetUint64(0)
|
||||
status := rows[0].GetString(1)
|
||||
|
||||
log.Info("found and locked matching task",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.String("status", status))
|
||||
|
||||
// handle different task statuses
|
||||
if status == string(TaskStatusPaused) {
|
||||
// paused tasks can be directly deleted
|
||||
} else if status == string(TaskStatusRunning) || status == string(TaskStatusResetting) {
|
||||
// for running/resetting tasks, check if they are stale (dead processes)
|
||||
log.Info("task is running/resetting, checking if it's stale before abort",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.String("status", status))
|
||||
|
||||
// get the task's heartbeat time to check if it's stale
|
||||
heartbeatSQL := fmt.Sprintf(selectTaskHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
|
||||
heartbeatRows, _, heartbeatErr := execCtx.ExecRestrictedSQL(ctx, sessionOpts, heartbeatSQL, taskID)
|
||||
if heartbeatErr != nil {
|
||||
log.Warn("failed to check task heartbeat during abort, skipping",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.Error(heartbeatErr))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(heartbeatRows) == 0 {
|
||||
log.Warn("task not found when checking heartbeat, skipping abort",
|
||||
zap.Uint64("task_id", taskID))
|
||||
return nil
|
||||
}
|
||||
|
||||
initialHeartbeatTime := heartbeatRows[0].GetTime(0).String()
|
||||
|
||||
// check if the task is stale (not updating heartbeat)
|
||||
isStale, staleErr := r.isTaskStale(ctx, taskID, initialHeartbeatTime)
|
||||
if staleErr != nil {
|
||||
log.Warn("failed to determine if task is stale, skipping abort",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.Error(staleErr))
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isStale {
|
||||
log.Info("task is actively running, cannot abort",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.String("status", status))
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("task is stale, proceeding with abort",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.String("status", status))
|
||||
} else {
|
||||
log.Error("task is in unexpected status, cannot abort",
|
||||
zap.Uint64("task_id", taskID),
|
||||
zap.String("status", status))
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete the paused task
|
||||
deleteSQL := fmt.Sprintf(deleteRegistrationSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
|
||||
_, _, err = execCtx.ExecRestrictedSQL(ctx, sessionOpts, deleteSQL, taskID)
|
||||
if err != nil {
|
||||
return errors.Annotatef(err, "failed to delete task %d", taskID)
|
||||
}
|
||||
|
||||
deletedTaskID = taskID
|
||||
log.Info("successfully deleted matching paused task", zap.Uint64("task_id", taskID))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if deletedTaskID != 0 {
|
||||
log.Info("successfully deleted matching task", zap.Uint64("task_id", deletedTaskID))
|
||||
}
|
||||
|
||||
return deletedTaskID, nil
|
||||
}
|
||||
|
||||
@ -1341,6 +1341,7 @@ func (rc *SnapClient) execAndValidateChecksum(
|
||||
|
||||
item, exists := rc.checkpointChecksum[tbl.Table.ID]
|
||||
if !exists {
|
||||
log.Info("did not find checksum from checkpoint, scanning table to calculate checksum")
|
||||
startTS, err := restore.GetTSWithRetry(ctx, rc.pdClient)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
||||
@ -2573,3 +2573,127 @@ func setTablesRestoreModeIfNeeded(tables []*metautil.Table, cfg *SnapshotRestore
|
||||
log.Info("set tables to restore mode for filtered PiTR restore", zap.Int("table count", len(tables)))
|
||||
}
|
||||
}
|
||||
|
||||
// RunRestoreAbort aborts a restore task by finding it in the registry and cleaning up
|
||||
// Similar to resumeOrCreate, it first resolves the restoredTS then finds and deletes the matching paused task
|
||||
func RunRestoreAbort(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
|
||||
cfg.Adjust()
|
||||
defer summary.Summary(cmdName)
|
||||
ctx, cancel := context.WithCancel(c)
|
||||
defer cancel()
|
||||
|
||||
keepaliveCfg := GetKeepalive(&cfg.Config)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, true, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer mgr.Close()
|
||||
|
||||
// get upstream cluster ID and startTS from backup storage if not already set
|
||||
if cfg.UpstreamClusterID == 0 {
|
||||
if IsStreamRestore(cmdName) {
|
||||
// For PiTR restore, get cluster ID from log storage
|
||||
_, s, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
logInfo, err := getLogInfoFromStorage(ctx, s)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
cfg.UpstreamClusterID = logInfo.clusterID
|
||||
|
||||
// For PiTR with full backup, get startTS from full backup meta
|
||||
if len(cfg.FullBackupStorage) > 0 && cfg.StartTS == 0 {
|
||||
startTS, fullClusterID, err := getFullBackupTS(ctx, cfg)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID {
|
||||
return errors.Annotatef(berrors.ErrInvalidArgument,
|
||||
"cluster ID mismatch: log backup from cluster %d, full backup from cluster %d",
|
||||
logInfo.clusterID, fullClusterID)
|
||||
}
|
||||
cfg.StartTS = startTS
|
||||
log.Info("extracted startTS from full backup storage for abort",
|
||||
zap.Uint64("start_ts", cfg.StartTS))
|
||||
}
|
||||
} else {
|
||||
// For snapshot restore, get cluster ID from backup meta
|
||||
_, _, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
cfg.UpstreamClusterID = backupMeta.ClusterId
|
||||
}
|
||||
log.Info("extracted upstream cluster ID from backup storage for abort",
|
||||
zap.Uint64("upstream_cluster_id", cfg.UpstreamClusterID),
|
||||
zap.String("cmd", cmdName))
|
||||
}
|
||||
|
||||
// build restore registry
|
||||
restoreRegistry, err := registry.NewRestoreRegistry(g, mgr.GetDomain())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer restoreRegistry.Close()
|
||||
|
||||
// determine if restoredTS was user-specified
|
||||
// if RestoreTS is 0, it means user didn't specify it (similar to resumeOrCreate logic)
|
||||
isRestoredTSUserSpecified := cfg.RestoreTS != 0
|
||||
|
||||
// create registration info from config to find matching tasks
|
||||
registrationInfo := registry.RegistrationInfo{
|
||||
FilterStrings: cfg.FilterStr,
|
||||
StartTS: cfg.StartTS,
|
||||
RestoredTS: cfg.RestoreTS,
|
||||
UpstreamClusterID: cfg.UpstreamClusterID,
|
||||
WithSysTable: cfg.WithSysTable,
|
||||
Cmd: cmdName,
|
||||
}
|
||||
|
||||
// find and delete matching paused task atomically
|
||||
// this will first resolve the restoredTS (similar to resumeOrCreate) then find and delete the task
|
||||
deletedRestoreID, err := restoreRegistry.FindAndDeleteMatchingTask(ctx, registrationInfo, isRestoredTSUserSpecified)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if deletedRestoreID == 0 {
|
||||
log.Info("no paused restore task found with matching parameters")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("successfully deleted matching paused restore task", zap.Uint64("restoreId", deletedRestoreID))
|
||||
|
||||
// clean up checkpoint data for the deleted task
|
||||
log.Info("cleaning up checkpoint data", zap.Uint64("restoreId", deletedRestoreID))
|
||||
|
||||
// update config with restore ID to clean up checkpoint
|
||||
cfg.RestoreID = deletedRestoreID
|
||||
|
||||
// initialize all checkpoint managers for cleanup (deletion is noop if checkpoints not exist)
|
||||
if len(cfg.CheckpointStorage) > 0 {
|
||||
clusterID := mgr.GetPDClient().GetClusterID(ctx)
|
||||
log.Info("initializing storage checkpoint meta managers for cleanup",
|
||||
zap.Uint64("restoreID", deletedRestoreID),
|
||||
zap.Uint64("clusterID", clusterID))
|
||||
if err := cfg.newStorageCheckpointMetaManagerPITR(ctx, clusterID, deletedRestoreID); err != nil {
|
||||
log.Warn("failed to initialize storage checkpoint meta managers for cleanup", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
log.Info("initializing table checkpoint meta managers for cleanup",
|
||||
zap.Uint64("restoreID", deletedRestoreID))
|
||||
if err := cfg.newTableCheckpointMetaManagerPITR(g, mgr.GetDomain(), deletedRestoreID); err != nil {
|
||||
log.Warn("failed to initialize table checkpoint meta managers for cleanup", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// clean up checkpoint data
|
||||
cleanUpCheckpoints(ctx, cfg)
|
||||
|
||||
log.Info("successfully aborted restore task and cleaned up checkpoint data. "+
|
||||
"Use drop statements to clean up the restored data from the cluster if you want to.",
|
||||
zap.Uint64("restoreId", deletedRestoreID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1337,6 +1337,9 @@ func RunStreamRestore(
|
||||
// if not set by user, restore to the max TS available
|
||||
if cfg.RestoreTS == 0 {
|
||||
cfg.RestoreTS = logInfo.logMaxTS
|
||||
cfg.IsRestoredTSUserSpecified = false
|
||||
} else {
|
||||
cfg.IsRestoredTSUserSpecified = true
|
||||
}
|
||||
cfg.UpstreamClusterID = logInfo.clusterID
|
||||
|
||||
|
||||
@ -39,6 +39,86 @@ cleanup_and_exit() {
|
||||
}
|
||||
trap cleanup_and_exit EXIT
|
||||
|
||||
verify_table_data() {
|
||||
local db_name="$1"
|
||||
local table_prefix="$2"
|
||||
local expected_value="$3"
|
||||
local test_context="${4:-verification}"
|
||||
|
||||
echo "Verifying table $db_name.${table_prefix}_$expected_value contains value $expected_value ($test_context)..."
|
||||
run_sql "select c from $db_name.${table_prefix}_$expected_value;"
|
||||
check_contains "c: $expected_value"
|
||||
}
|
||||
|
||||
verify_registry() {
|
||||
local filter_condition="$1"
|
||||
local should_exist="$2" # true or false
|
||||
local test_context="${3:-registry check}"
|
||||
|
||||
if [ "$should_exist" = "true" ]; then
|
||||
echo "Verifying that registry entries EXIST for '$filter_condition' ($test_context)..."
|
||||
else
|
||||
echo "Verifying that NO registry entries exist for '$filter_condition' ($test_context)..."
|
||||
fi
|
||||
|
||||
run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE $filter_condition;"
|
||||
|
||||
# Extract count from MySQL output
|
||||
registry_result=$(cat "$TEST_DIR/sql_res.$TEST_NAME.txt")
|
||||
actual_count=$(echo "$registry_result" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
actual_count=${actual_count:-0}
|
||||
|
||||
if [ "$should_exist" = "true" ]; then
|
||||
if [ "$actual_count" -gt 0 ]; then
|
||||
echo "PASS: Found $actual_count registry entries"
|
||||
return 0
|
||||
else
|
||||
echo "ERROR: No registry entries found for condition: $filter_condition"
|
||||
echo "Registry query result:"
|
||||
echo "$registry_result"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
if [ "$actual_count" -eq 0 ]; then
|
||||
echo "PASS: No registry entries found (as expected)"
|
||||
return 0
|
||||
else
|
||||
echo "ERROR: Found $actual_count registry entries when expecting none"
|
||||
echo "Registry query result:"
|
||||
echo "$registry_result"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
|
||||
verify_snapshot_checkpoint_databases_exist() {
|
||||
local expected_count="${1:-4}"
|
||||
local test_context="${2:-checkpoint existence check}"
|
||||
|
||||
echo "Verifying snapshot checkpoint databases exist ($test_context)..."
|
||||
|
||||
# Check for snapshot checkpoint databases - every restore type creates these
|
||||
local snapshot_checkpoint_dbs=$(run_sql "SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME LIKE '__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint_%';" | grep -v "SCHEMA_NAME" | grep -v "^$" || true)
|
||||
|
||||
local actual_count=0
|
||||
if [ -n "$snapshot_checkpoint_dbs" ]; then
|
||||
actual_count=$(echo "$snapshot_checkpoint_dbs" | wc -l | tr -d ' ')
|
||||
echo "Found snapshot checkpoint databases:"
|
||||
echo "$snapshot_checkpoint_dbs" | sed 's/^/ /'
|
||||
fi
|
||||
|
||||
if [ "$actual_count" -ge "$expected_count" ]; then
|
||||
echo "PASS: Found $actual_count snapshot checkpoint databases (expected at least $expected_count)"
|
||||
return 0
|
||||
else
|
||||
echo "WARNING: Found only $actual_count snapshot checkpoint databases (expected at least $expected_count)"
|
||||
echo "This may indicate checkpoints weren't created properly"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
verify_no_temporary_databases() {
|
||||
local test_case="${1:-unknown test}"
|
||||
# this function verifies that BR has properly cleaned up all temporary databases
|
||||
@ -60,6 +140,24 @@ verify_no_temporary_databases() {
|
||||
echo "Verification passed: No temporary databases found in $test_case"
|
||||
}
|
||||
|
||||
verify_checkpoint_cleanup() {
|
||||
local test_case="${1:-unknown test}"
|
||||
echo "Verifying checkpoint cleanup after abort in $test_case..."
|
||||
|
||||
# Check for any BR temporary databases
|
||||
local temp_dbs=$(run_sql "SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME LIKE '__TiDB_BR_Temporary_%';" | grep -v "SCHEMA_NAME" | grep -v "^$" || true)
|
||||
|
||||
if [ -n "$temp_dbs" ]; then
|
||||
echo "WARNING: Found temporary databases that should have been cleaned up:"
|
||||
echo "$temp_dbs" | sed 's/^/ /'
|
||||
echo "NOTE: Some temporary databases remain - this may be expected if cleanup is async or if there are other running tests"
|
||||
else
|
||||
echo "PASS: All temporary databases have been cleaned up"
|
||||
fi
|
||||
|
||||
echo "Checkpoint cleanup verification completed for $test_case"
|
||||
}
|
||||
|
||||
create_tables_with_values() {
|
||||
local prefix=$1 # table name prefix
|
||||
local count=$2 # number of tables to create
|
||||
@ -142,24 +240,24 @@ test_mixed_parallel_restores() {
|
||||
# Verify DB restore (full database with filter)
|
||||
echo "Verifying full restore with filter of $DB"
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB.full_$i;" | grep $i
|
||||
verify_table_data $DB "full" $i
|
||||
done
|
||||
|
||||
# Verify DB2 restore (PITR with filter)
|
||||
echo "Verifying PITR restore with filter of $DB2..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB2.full_$i;" | grep $i
|
||||
verify_table_data $DB2 "full" $i
|
||||
done
|
||||
|
||||
# Verify DB3 restore (specific db)
|
||||
echo "Verifying specific db restore for $DB3..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB3.full_$i;" | grep $i
|
||||
verify_table_data $DB3 "full" $i
|
||||
done
|
||||
|
||||
# Verify DB4 restore (specific table)
|
||||
echo "Verifying specific table restore for $DB4..."
|
||||
run_sql "select c from $DB4.full_1;" | grep 1
|
||||
verify_table_data $DB4 "full" 1
|
||||
for i in $(seq 2 $TABLE_COUNT); do
|
||||
if run_sql "show tables from $DB4 like 'full_$i';" | grep -q "full_$i"; then
|
||||
echo "Error: Table full_$i should not have been restored" >&2
|
||||
@ -170,7 +268,7 @@ test_mixed_parallel_restores() {
|
||||
# Verify DB_LOG restore
|
||||
echo "Verifying PITR restore of $DB_LOG (created after backup)..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB_LOG.log_$i;" | grep $i
|
||||
verify_table_data $DB_LOG "log" $i
|
||||
done
|
||||
|
||||
echo "Mixed parallel restores with specified parameters completed successfully"
|
||||
@ -206,8 +304,9 @@ test_concurrent_restore_table_conflicts() {
|
||||
run_br restore point --filter "$DB.*" --full-backup-storage "$BACKUP_DIR" -s "$LOG_BACKUP_DIR"
|
||||
|
||||
# verify the first database was restored correctly
|
||||
echo "Verifying first database was restored correctly..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB.full_$i;" | grep $i
|
||||
verify_table_data $DB "full" $i
|
||||
done
|
||||
|
||||
verify_no_temporary_databases "Test Case 2: Concurrent restore with table conflicts"
|
||||
@ -230,8 +329,9 @@ test_restore_with_different_systable_settings() {
|
||||
# should succeed because we're using a different with-sys-table setting
|
||||
run_br restore full --filter "mysql.*" --filter "$DB2.*" --with-sys-table=false -s "$BACKUP_DIR"
|
||||
|
||||
echo "Verifying different system table settings restore..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB2.full_$i;" | grep $i
|
||||
verify_table_data $DB2 "full" $i
|
||||
done
|
||||
|
||||
# complete the first one
|
||||
@ -271,10 +371,6 @@ test_auto_restored_ts_conflict() {
|
||||
echo "Waiting for log checkpoint to advance..."
|
||||
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance ${TASK_NAME}_pitr
|
||||
|
||||
# wait a few seconds to ensure log backup has progressed beyond this timestamp
|
||||
echo "Waiting for log backup to advance..."
|
||||
sleep 5
|
||||
|
||||
# stop log backup
|
||||
run_br log stop --task-name ${TASK_NAME}_pitr
|
||||
|
||||
@ -295,31 +391,8 @@ test_auto_restored_ts_conflict() {
|
||||
|
||||
echo "First PiTR restore failed as expected, leaving paused registry entry"
|
||||
|
||||
echo "Checking for registry entries..."
|
||||
|
||||
registry_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "TABLE_ERROR")
|
||||
|
||||
echo "Debug: Registry check output: '$registry_check'"
|
||||
|
||||
# Extract count from MySQL vertical format output
|
||||
first_task_count=$(echo "$registry_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
|
||||
if [ -z "$first_task_count" ] || [ "$first_task_count" -eq 0 ]; then
|
||||
echo "Error: No tasks found in registry (count: ${first_task_count:-0})"
|
||||
echo "Raw output: '$registry_check'"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo "Found $first_task_count task(s) in registry"
|
||||
|
||||
# Get basic task information for verification
|
||||
echo "Debug: Checking task details..."
|
||||
run_sql "SELECT id, status, filter_strings FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' ORDER BY id DESC LIMIT 1;" || {
|
||||
echo "Warning: Could not query task details, continuing..."
|
||||
return 0
|
||||
}
|
||||
|
||||
echo "PASS: Registry verification completed (found entries as expected)"
|
||||
# Verify that a task was created in the registry
|
||||
verify_registry "filter_strings = '$DB.*'" true "Test 1: Failed restore should create registry entry"
|
||||
|
||||
# Test 2: Retry without explicit restored-ts (auto-detection) - should resume existing paused task
|
||||
echo "Test 2: Retry without explicit restored-ts (auto-detection)..."
|
||||
@ -330,25 +403,12 @@ test_auto_restored_ts_conflict() {
|
||||
|
||||
# Verify the restore succeeded
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB.pitr_$i;" | grep $i
|
||||
verify_table_data $DB "pitr" $i
|
||||
done
|
||||
echo "PASS: Second restore with auto-detection succeeded by resuming existing task"
|
||||
|
||||
# Check registry state after successful restore
|
||||
echo "Checking registry state after restore..."
|
||||
registry_count_after=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "0")
|
||||
echo "Debug: Registry entries after restore: '$registry_count_after'"
|
||||
|
||||
final_count=$(echo "$registry_count_after" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
final_count=${final_count:-0}
|
||||
|
||||
if [ "$final_count" -eq 0 ]; then
|
||||
echo "PASS: Task was completed and cleaned up from registry"
|
||||
else
|
||||
echo "ERROR: Task may still exist in registry (count: $final_count)"
|
||||
# Show registry state for debugging
|
||||
run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "Could not query details"
|
||||
fi
|
||||
# Check registry state after successful restore - should be cleaned up
|
||||
verify_registry "filter_strings = '$DB.*'" false "Test 2: Successful restore should clean up registry"
|
||||
|
||||
# Clean up any remaining registry entries
|
||||
run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';"
|
||||
@ -368,27 +428,13 @@ test_auto_restored_ts_conflict() {
|
||||
exit 1
|
||||
fi
|
||||
export GO_FAILPOINTS=""
|
||||
|
||||
|
||||
# Now manually change the paused task to running status to simulate a stuck running task
|
||||
echo "Manually changing paused task to running status to simulate stuck task..."
|
||||
run_sql "UPDATE mysql.tidb_restore_registry SET status = 'running' WHERE filter_strings = '$DB.*' AND status = 'paused';"
|
||||
|
||||
# Verify that we actually have a running task after the update
|
||||
echo "Verifying that we have a running task..."
|
||||
stuck_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' AND status = 'running';" 2>/dev/null || echo "0")
|
||||
echo "Debug: Running task check: '$stuck_check'"
|
||||
|
||||
# Verify we have exactly 1 running task before attempting restore
|
||||
stuck_count=$(echo "$stuck_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
stuck_count=${stuck_count:-0}
|
||||
|
||||
if [ "$stuck_count" -eq 0 ]; then
|
||||
echo "ERROR: Failed to create running task for stale detection test"
|
||||
echo "Expected to have at least 1 running task after manual status update"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Successfully created $stuck_count running task(s) for stale detection test"
|
||||
verify_registry "filter_strings = '$DB.*' AND status = 'running'" true "Test 3: Should have running task for stale detection test"
|
||||
|
||||
# Try to restore without explicit restored-ts - should detect stale task, transition it to paused, and reuse it
|
||||
echo "Attempting restore without explicit restored-ts (testing stale detection and reuse)..."
|
||||
@ -399,79 +445,253 @@ test_auto_restored_ts_conflict() {
|
||||
# Verify the restore succeeded by checking the data
|
||||
echo "Verifying restore succeeded..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB.pitr_$i;" | grep $i
|
||||
verify_table_data $DB "pitr" $i
|
||||
done
|
||||
echo "PASS: Stale task detection and reuse worked correctly"
|
||||
|
||||
# Check final task status - should be cleaned up after successful restore
|
||||
echo "Checking final task status after successful restore..."
|
||||
final_task_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "0")
|
||||
final_task_count=$(echo "$final_task_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
final_task_count=${final_task_count:-0}
|
||||
verify_registry "filter_strings = '$DB.*'" false "Test 3: Successful restore should clean up registry"
|
||||
|
||||
if [ "$final_task_count" -eq 0 ]; then
|
||||
echo "PASS: Task was completed and cleaned up from registry"
|
||||
else
|
||||
echo "ERROR: Task should have been cleaned up but $final_task_count task(s) remain in registry"
|
||||
run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "Could not query task details"
|
||||
exit 1
|
||||
fi
|
||||
# Test 4: Test that user can resume paused task with same explicit RestoreTS
|
||||
echo "Test 4: Testing user can resume paused task with same explicit RestoreTS..."
|
||||
|
||||
# Test 4: Test with user-specified RestoreTS (should bypass conflict resolution)
|
||||
echo "Test 4: Testing user-specified RestoreTS (should bypass all conflict resolution)..."
|
||||
# Clean up any existing data from previous tests (use $DB since backup only contains $DB data)
|
||||
run_sql "drop database if exists $DB;"
|
||||
|
||||
# Use DB2 for this test to avoid table conflicts with existing DB
|
||||
run_sql "drop database if exists $DB2;"
|
||||
|
||||
# Create a paused task for DB2 first
|
||||
# Create a paused task for $DB first
|
||||
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)"
|
||||
restore_fail=0
|
||||
run_br restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" || restore_fail=1
|
||||
run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
export GO_FAILPOINTS=""
|
||||
|
||||
echo "Checking for created paused task for $DB2..."
|
||||
run_sql "SELECT id, restored_ts FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB2.*' ORDER BY id DESC LIMIT 1;" 2>/dev/null || echo "Could not query paused task details"
|
||||
# Verify the paused task was created
|
||||
verify_registry "filter_strings = '$DB.*'" true "Test 4: Failed restore should create registry entry"
|
||||
|
||||
# Now try with different user-specified RestoreTS - should create new task, not reuse existing
|
||||
different_ts=$((log_backup_ts - 1000000)) # Slightly different timestamp
|
||||
echo "Attempting restore with user-specified RestoreTS ($different_ts) different from existing task..."
|
||||
# Get the task ID for verification
|
||||
task_info_before=$(run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' AND status = 'paused' ORDER BY id DESC LIMIT 1;" 2>/dev/null || echo "")
|
||||
echo "Task info before resume: $task_info_before"
|
||||
|
||||
run_br restore point --filter "$DB2.*" --restored-ts $different_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR"
|
||||
# Now try to resume with the same user-specified RestoreTS - should reuse existing task
|
||||
echo "Attempting to resume with same user-specified RestoreTS ($log_backup_ts)..."
|
||||
|
||||
# Verify restore succeeded - Note: The backup contains pitr_* tables, not full_* tables
|
||||
run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR"
|
||||
|
||||
# Verify restore succeeded - check that the expected tables exist and have correct data
|
||||
echo "Verifying restore succeeded..."
|
||||
for i in $(seq 1 $TABLE_COUNT); do
|
||||
run_sql "select c from $DB2.pitr_$i;" | grep $i
|
||||
verify_table_data $DB "pitr" $i
|
||||
done
|
||||
echo "PASS: User-specified RestoreTS bypassed conflict resolution correctly"
|
||||
|
||||
# Verify that the same task was resumed (not a new task created)
|
||||
verify_registry "filter_strings = '$DB.*'" false "Test 4: Task should be cleaned up after successful restore"
|
||||
|
||||
echo "PASS: User-specified RestoreTS correctly resumed existing paused task"
|
||||
|
||||
# Final cleanup
|
||||
run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';"
|
||||
run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB2.*';"
|
||||
|
||||
echo "Final registry state check:"
|
||||
final_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings LIKE '$DB%' OR filter_strings LIKE '$DB2%';" 2>/dev/null || echo "0")
|
||||
echo "Debug: Final registry check: '$final_check'"
|
||||
|
||||
final_remaining=$(echo "$final_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*')
|
||||
final_remaining=${final_remaining:-0}
|
||||
|
||||
if [ "$final_remaining" -eq 0 ]; then
|
||||
echo "PASS: Registry is clean"
|
||||
else
|
||||
echo "Note: $final_remaining task(s) remain in registry"
|
||||
run_sql "SELECT id, status, filter_strings FROM mysql.tidb_restore_registry WHERE filter_strings LIKE '$DB%' OR filter_strings LIKE '$DB2%';" 2>/dev/null || echo "Could not query remaining task details"
|
||||
fi
|
||||
# Final registry state check
|
||||
verify_registry "filter_strings LIKE '$DB%'" false "Final cleanup: Registry should be clean"
|
||||
|
||||
echo "Intelligent RestoreTS resolution test completed successfully"
|
||||
echo "This test validates the new behavior:"
|
||||
echo "1. Failed restore leaves paused task in registry"
|
||||
echo "2. Retry without explicit restored-ts resumes existing paused task"
|
||||
echo "3. Stale running task detection works (when applicable)"
|
||||
echo "4. User-specified restored-ts bypasses conflict resolution"
|
||||
cleanup
|
||||
}
|
||||
|
||||
test_restore_abort() {
|
||||
echo "Test Case 6: Comprehensive restore abort functionality for all restore types"
|
||||
|
||||
# use separate backup directories for this test
|
||||
ABORT_BACKUP_DIR="local://$TEST_DIR/abort_backup"
|
||||
ABORT_LOG_BACKUP_DIR="local://$TEST_DIR/abort_log_backup"
|
||||
|
||||
echo "Setting up backup data for abort testing..."
|
||||
|
||||
# create initial data for multiple databases
|
||||
run_sql "create database if not exists $DB;"
|
||||
run_sql "create database if not exists $DB2;"
|
||||
run_sql "create database if not exists $DB3;"
|
||||
run_sql "create database if not exists $DB4;"
|
||||
|
||||
create_tables_with_values "abort" $TABLE_COUNT $DB
|
||||
create_tables_with_values "abort" $TABLE_COUNT $DB2
|
||||
create_tables_with_values "abort" $TABLE_COUNT $DB3
|
||||
create_tables_with_values "abort" $TABLE_COUNT $DB4
|
||||
|
||||
# start log backup
|
||||
run_br log start --task-name ${TASK_NAME}_abort -s "$ABORT_LOG_BACKUP_DIR"
|
||||
|
||||
# take snapshot backup
|
||||
run_br backup full -s "$ABORT_BACKUP_DIR"
|
||||
|
||||
# add more data after snapshot backup
|
||||
run_sql "create database if not exists ${DB}_after_snapshot;"
|
||||
create_tables_with_values "after" $TABLE_COUNT ${DB}_after_snapshot
|
||||
|
||||
# wait for log checkpoint to advance
|
||||
log_backup_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
|
||||
echo "Using log backup timestamp: $log_backup_ts"
|
||||
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance ${TASK_NAME}_abort
|
||||
|
||||
# stop log backup
|
||||
run_br log stop --task-name ${TASK_NAME}_abort
|
||||
|
||||
# clean up source data
|
||||
run_sql "drop database $DB;"
|
||||
run_sql "drop database $DB2;"
|
||||
run_sql "drop database $DB3;"
|
||||
run_sql "drop database $DB4;"
|
||||
run_sql "drop database ${DB}_after_snapshot;"
|
||||
|
||||
echo "=== Step 1: Create All Paused Restore Tasks ==="
|
||||
|
||||
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)"
|
||||
|
||||
echo "Creating paused PiTR restore task..."
|
||||
restore_fail=0
|
||||
run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting PiTR restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Creating paused full restore task..."
|
||||
restore_fail=0
|
||||
run_br restore full --filter "$DB2.*" -s "$ABORT_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting full restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Creating paused database restore task..."
|
||||
restore_fail=0
|
||||
run_br restore db --db "$DB3" -s "$ABORT_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting db restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Creating paused table restore task..."
|
||||
restore_fail=0
|
||||
run_br restore table --db "$DB4" --table "abort_1" -s "$ABORT_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting table restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export GO_FAILPOINTS=""
|
||||
|
||||
echo "=== Step 2: Verify All Paused Tasks Exist ==="
|
||||
|
||||
# Verify all 4 paused tasks exist in registry
|
||||
verify_registry "filter_strings = '$DB.*' AND status = 'paused'" true "paused PiTR task"
|
||||
verify_registry "filter_strings = '$DB2.*' AND status = 'paused'" true "paused full restore task"
|
||||
verify_registry "cmd = 'DataBase Restore' AND status = 'paused'" true "paused database restore task"
|
||||
verify_registry "cmd = 'Table Restore' AND status = 'paused'" true "paused table restore task"
|
||||
|
||||
# Verify that snapshot checkpoint databases were created for the 4 restore tasks
|
||||
verify_snapshot_checkpoint_databases_exist 4 "after creating 4 paused restore tasks"
|
||||
|
||||
# Debug: Print entire registry contents before abort
|
||||
echo "DEBUG: Registry contents before abort attempt:"
|
||||
run_sql "SELECT id, filter_strings, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd, last_heartbeat_time FROM mysql.tidb_restore_registry;"
|
||||
cat "$TEST_DIR/sql_res.$TEST_NAME.txt"
|
||||
|
||||
echo "=== Step 3: Abort All Restore Tasks ==="
|
||||
|
||||
echo "Aborting PiTR restore task..."
|
||||
run_br abort restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR"
|
||||
|
||||
echo "Aborting full restore task..."
|
||||
run_br abort restore full --filter "$DB2.*" -s "$ABORT_BACKUP_DIR"
|
||||
|
||||
echo "Aborting database restore task..."
|
||||
run_br abort restore db --db "$DB3" -s "$ABORT_BACKUP_DIR"
|
||||
|
||||
echo "Aborting table restore task..."
|
||||
run_br abort restore table --db "$DB4" --table "abort_1" -s "$ABORT_BACKUP_DIR"
|
||||
|
||||
echo "=== Step 4: Verify All Tasks Deleted ==="
|
||||
|
||||
# Verify all tasks were deleted from registry
|
||||
verify_registry "filter_strings = '$DB.*'" false "PiTR task deletion"
|
||||
verify_registry "filter_strings = '$DB2.*'" false "full restore task deletion"
|
||||
verify_registry "cmd = 'DataBase Restore'" false "database restore task deletion"
|
||||
verify_registry "cmd = 'Table Restore'" false "table restore task deletion"
|
||||
|
||||
# Verify no paused tasks remain
|
||||
verify_registry "status = 'paused'" false "all paused tasks cleaned up"
|
||||
|
||||
# Verify checkpoint cleanup for all restore types
|
||||
verify_checkpoint_cleanup "All restore types abort"
|
||||
|
||||
# Clean up all user databases since the aborted restores may have left partial data
|
||||
echo "Cleaning up all user databases after abort test..."
|
||||
run_sql "drop database if exists $DB;"
|
||||
run_sql "drop database if exists $DB2;"
|
||||
run_sql "drop database if exists $DB3;"
|
||||
run_sql "drop database if exists $DB4;"
|
||||
run_sql "drop database if exists ${DB}_after_snapshot;"
|
||||
echo "User database cleanup completed"
|
||||
|
||||
echo "=== Step 5: Special Cases ==="
|
||||
|
||||
echo "Abort non-existent task (should succeed gracefully)..."
|
||||
|
||||
# Try to abort when no matching task exists
|
||||
run_br abort restore point --filter "${DB}_nonexistent.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR"
|
||||
|
||||
echo "PASS: Abort of non-existent task completed gracefully"
|
||||
|
||||
echo "Abort with auto-detected restored-ts..."
|
||||
|
||||
# Create another paused PiTR task
|
||||
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)"
|
||||
restore_fail=0
|
||||
run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
export GO_FAILPOINTS=""
|
||||
|
||||
# Verify paused task exists
|
||||
verify_registry "filter_strings = '$DB.*' AND status = 'paused'" true "paused task creation for auto-detection test"
|
||||
|
||||
# Abort without specifying restored-ts (should use auto-detection)
|
||||
run_br abort restore point --filter "$DB.*" --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR"
|
||||
|
||||
# Verify task was deleted
|
||||
verify_registry "filter_strings = '$DB.*'" false "task deletion after abort with auto-detection"
|
||||
|
||||
echo "Try to abort a stale running task (should detect it's dead and clean it up)..."
|
||||
|
||||
# Create a paused task and manually change it to running
|
||||
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)"
|
||||
restore_fail=0
|
||||
run_br restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1
|
||||
if [ $restore_fail -ne 1 ]; then
|
||||
echo 'expecting restore to fail before completion but succeeded'
|
||||
exit 1
|
||||
fi
|
||||
export GO_FAILPOINTS=""
|
||||
|
||||
# Change status to running
|
||||
run_sql "UPDATE mysql.tidb_restore_registry SET status = 'running' WHERE filter_strings = '$DB2.*' AND status = 'paused';"
|
||||
|
||||
# Verify we have a running task
|
||||
verify_registry "filter_strings = '$DB2.*' AND status = 'running'" true "stale running task creation for abort test"
|
||||
|
||||
# Try to abort the stale running task - should detect it's dead and clean it up
|
||||
run_br abort restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR"
|
||||
|
||||
# Verify task was deleted (abort should have detected it was stale and cleaned it up)
|
||||
verify_registry "filter_strings = '$DB2.*'" false "stale running task should be deleted after abort"
|
||||
|
||||
echo "Comprehensive restore abort functionality test completed successfully"
|
||||
|
||||
cleanup
|
||||
}
|
||||
@ -482,5 +702,6 @@ test_mixed_parallel_restores
|
||||
test_concurrent_restore_table_conflicts
|
||||
test_restore_with_different_systable_settings
|
||||
test_auto_restored_ts_conflict
|
||||
test_restore_abort
|
||||
|
||||
echo "Parallel restore tests completed successfully"
|
||||
|
||||
Reference in New Issue
Block a user