From e506beccc7aa9ce69c10dc1afffd6a87794269fc Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Sun, 29 Jun 2025 23:03:00 -0400 Subject: [PATCH] br: add tool to clean up registry and checkpoints (#61977) close pingcap/tidb#61964, close pingcap/tidb#62011 --- br/cmd/br/BUILD.bazel | 1 + br/cmd/br/abort.go | 141 +++++++++ br/cmd/br/main.go | 1 + br/pkg/registry/registration.go | 189 ++++++++++- br/pkg/restore/snap_client/client.go | 1 + br/pkg/task/restore.go | 124 ++++++++ br/pkg/task/stream.go | 3 + br/tests/br_parallel_restore/run.sh | 451 ++++++++++++++++++++------- 8 files changed, 785 insertions(+), 126 deletions(-) create mode 100644 br/cmd/br/abort.go diff --git a/br/cmd/br/BUILD.bazel b/br/cmd/br/BUILD.bazel index 6e3615a62c..4d89f2460c 100644 --- a/br/cmd/br/BUILD.bazel +++ b/br/cmd/br/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test") go_library( name = "br_lib", srcs = [ + "abort.go", "backup.go", "cmd.go", "debug.go", diff --git a/br/cmd/br/abort.go b/br/cmd/br/abort.go new file mode 100644 index 0000000000..e32eaaf4fb --- /dev/null +++ b/br/cmd/br/abort.go @@ -0,0 +1,141 @@ +// Copyright 2025 PingCAP, Inc. Licensed under Apache-2.0. + +package main + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/trace" + "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/spf13/cobra" + "go.uber.org/zap" + "sourcegraph.com/sourcegraph/appdash" +) + +// NewAbortCommand returns an abort subcommand +func NewAbortCommand() *cobra.Command { + command := &cobra.Command{ + Use: "abort", + Short: "abort restore tasks", + SilenceUsage: true, + PersistentPreRunE: func(c *cobra.Command, args []string) error { + if err := Init(c); err != nil { + return errors.Trace(err) + } + build.LogInfo(build.BR) + logutil.LogEnvVariables() + task.LogArguments(c) + return nil + }, + } + + command.AddCommand( + newAbortRestoreCommand(), + // future: newAbortBackupCommand(), + ) + task.DefineRestoreFlags(command.PersistentFlags()) + + return command +} + +// newAbortRestoreCommand returns an abort restore subcommand +func newAbortRestoreCommand() *cobra.Command { + command := &cobra.Command{ + Use: "restore", + Short: "abort restore tasks", + SilenceUsage: true, + } + + command.AddCommand( + newAbortRestoreFullCommand(), + newAbortRestoreDBCommand(), + newAbortRestoreTableCommand(), + newAbortRestorePointCommand(), + ) + + return command +} + +func newAbortRestoreFullCommand() *cobra.Command { + command := &cobra.Command{ + Use: "full", + Short: "abort a full restore task", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runAbortRestoreCommand(cmd, task.FullRestoreCmd) + }, + } + // define flags specific to full restore + task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, false) + task.DefineRestoreSnapshotFlags(command) + return command +} + +func newAbortRestoreDBCommand() *cobra.Command { + command := &cobra.Command{ + Use: "db", + Short: "abort a database restore task", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runAbortRestoreCommand(cmd, task.DBRestoreCmd) + }, + } + task.DefineDatabaseFlags(command) + return command +} + +func newAbortRestoreTableCommand() *cobra.Command { + command := &cobra.Command{ + Use: "table", + Short: "abort a table restore task", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runAbortRestoreCommand(cmd, task.TableRestoreCmd) + }, + } + task.DefineTableFlags(command) + return command +} + +func newAbortRestorePointCommand() *cobra.Command { + command := &cobra.Command{ + Use: "point", + Short: "abort a point-in-time restore task", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runAbortRestoreCommand(cmd, task.PointRestoreCmd) + }, + } + task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, true) + task.DefineStreamRestoreFlags(command) + return command +} + +func runAbortRestoreCommand(command *cobra.Command, cmdName string) error { + cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} + if err := cfg.ParseFromFlags(command.Flags(), false); err != nil { + command.SilenceUsage = false + return errors.Trace(err) + } + + if task.IsStreamRestore(cmdName) { + if err := cfg.ParseStreamRestoreFlags(command.Flags()); err != nil { + return errors.Trace(err) + } + } + + ctx := GetDefaultContext() + if cfg.EnableOpenTracing { + var store *appdash.MemoryStore + ctx, store = trace.TracerStartSpan(ctx) + defer trace.TracerFinishSpan(ctx, store) + } + + if err := task.RunRestoreAbort(ctx, tidbGlue, cmdName, &cfg); err != nil { + log.Error("failed to abort restore task", zap.Error(err)) + return errors.Trace(err) + } + return nil +} diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index b0f1931d86..71efa85591 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -29,6 +29,7 @@ func main() { NewRestoreCommand(), NewStreamCommand(), newOperatorCommand(), + NewAbortCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/pkg/registry/registration.go b/br/pkg/registry/registration.go index ab50fb5cf6..3d76146c7a 100644 --- a/br/pkg/registry/registration.go +++ b/br/pkg/registry/registration.go @@ -66,6 +66,13 @@ const ( SET status = %%? WHERE id = %%? AND status = %%?` + // updateStatusFromMultipleSQLTemplate is the SQL template for updating a task's status + // when the current status can be one of multiple values + updateStatusFromMultipleSQLTemplate = ` + UPDATE %s.%s + SET status = %%? + WHERE id = %%? AND status IN (%s)` + // resumeTaskByIDSQLTemplate is the SQL template for resuming a paused task by its ID resumeTaskByIDSQLTemplate = ` UPDATE %s.%s @@ -410,20 +417,38 @@ func (r *Registry) collectResettingStatusTasks(ctx context.Context) error { return nil } -// updateTaskStatusConditional updates a task's status only if its current status matches the expected status -func (r *Registry) updateTaskStatusConditional(ctx context.Context, restoreID uint64, currentStatus, +// updateTaskStatusFromMultiple updates a task's status only if its current status matches one of the expected statuses +func (r *Registry) updateTaskStatusFromMultiple(ctx context.Context, restoreID uint64, currentStatuses []TaskStatus, newStatus TaskStatus) error { - log.Info("attempting to update task status", + if len(currentStatuses) == 0 { + return errors.New("currentStatuses cannot be empty") + } + + // build the status list for the IN clause + statusList := make([]string, len(currentStatuses)) + for i, status := range currentStatuses { + statusList[i] = fmt.Sprintf("'%s'", string(status)) + } + statusInClause := strings.Join(statusList, ", ") + + log.Info("attempting to update task status from multiple possible statuses", zap.Uint64("restore_id", restoreID), - zap.String("current_status", string(currentStatus)), + zap.Strings("current_statuses", func() []string { + result := make([]string, len(currentStatuses)) + for i, s := range currentStatuses { + result[i] = string(s) + } + return result + }()), zap.String("new_status", string(newStatus))) - // use where to update only when status is what we want - updateSQL := fmt.Sprintf(updateStatusSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName) + // use where to update only when status is one of the expected values + updateSQL := fmt.Sprintf(updateStatusFromMultipleSQLTemplate, + RestoreRegistryDBName, RestoreRegistryTableName, statusInClause) - if err := r.se.ExecuteInternal(ctx, updateSQL, newStatus, restoreID, currentStatus); err != nil { - return errors.Annotatef(err, "failed to conditionally update task status from %s to %s", - currentStatus, newStatus) + if err := r.se.ExecuteInternal(ctx, updateSQL, newStatus, restoreID); err != nil { + return errors.Annotatef(err, "failed to conditionally update task status from %v to %s", + currentStatuses, newStatus) } return nil @@ -443,11 +468,12 @@ func (r *Registry) Unregister(ctx context.Context, restoreID uint64) error { return nil } -// PauseTask marks a task as paused only if it's currently running +// PauseTask marks a task as paused only if it's currently running or resetting func (r *Registry) PauseTask(ctx context.Context, restoreID uint64) error { // first stop heartbeat manager r.StopHeartbeatManager() - return r.updateTaskStatusConditional(ctx, restoreID, TaskStatusRunning, TaskStatusPaused) + return r.updateTaskStatusFromMultiple(ctx, restoreID, + []TaskStatus{TaskStatusRunning, TaskStatusResetting}, TaskStatusPaused) } // GetRegistrationsByMaxID returns all registrations with IDs smaller than maxID @@ -900,3 +926,144 @@ func (r *Registry) GlobalOperationAfterSetResettingStatus(ctx context.Context, } return nil } + +// FindAndDeleteMatchingTask finds and deletes the registry entry that matches the given restore configuration +// This is used for the abort functionality to clean up the matching task +// Similar to ResumeOrCreateRegistration, it first resolves the restoredTS then finds and deletes the matching +// paused task +// Returns the deleted task ID, or 0 if no matching task was found +func (r *Registry) FindAndDeleteMatchingTask(ctx context.Context, + info RegistrationInfo, isRestoredTSUserSpecified bool) (uint64, error) { + // resolve which restoredTS to use + resolvedRestoreTS, err := r.resolveRestoreTS(ctx, info, isRestoredTSUserSpecified) + if err != nil { + return 0, err + } + + // update info with resolved restoredTS if different + if resolvedRestoreTS != info.RestoredTS { + log.Info("using resolved restoredTS for abort operation", + zap.Uint64("original_restored_ts", info.RestoredTS), + zap.Uint64("resolved_restored_ts", resolvedRestoreTS)) + info.RestoredTS = resolvedRestoreTS + } + + filterStrings := strings.Join(info.FilterStrings, FilterSeparator) + + log.Info("searching for matching task to delete", + zap.String("filter_strings", filterStrings), + zap.Uint64("start_ts", info.StartTS), + zap.Uint64("restored_ts", info.RestoredTS), + zap.Uint64("upstream_cluster_id", info.UpstreamClusterID), + zap.Bool("with_sys_table", info.WithSysTable), + zap.String("cmd", info.Cmd)) + + var deletedTaskID uint64 + + err = r.executeInTransaction(ctx, func(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, + sessionOpts []sqlexec.OptionFuncAlias) error { + // find and lock the task that matches the configuration + lookupSQL := fmt.Sprintf(lookupRegistrationSQLTemplate, + RestoreRegistryDBName, RestoreRegistryTableName) + rows, _, err := execCtx.ExecRestrictedSQL(ctx, sessionOpts, lookupSQL, + filterStrings, info.StartTS, info.RestoredTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd) + if err != nil { + return errors.Annotate(err, "failed to lookup matching task") + } + + if len(rows) == 0 { + log.Info("no matching task found to delete") + return nil + } + + if len(rows) > 1 { + log.Error("multiple matching tasks found, this is unexpected and indicates a bug", + zap.Int("count", len(rows))) + return errors.Annotatef(berrors.ErrInvalidArgument, + "found %d matching tasks, expected exactly 1", len(rows)) + } + + // get the single matching task (now locked) + taskID := rows[0].GetUint64(0) + status := rows[0].GetString(1) + + log.Info("found and locked matching task", + zap.Uint64("task_id", taskID), + zap.String("status", status)) + + // handle different task statuses + if status == string(TaskStatusPaused) { + // paused tasks can be directly deleted + } else if status == string(TaskStatusRunning) || status == string(TaskStatusResetting) { + // for running/resetting tasks, check if they are stale (dead processes) + log.Info("task is running/resetting, checking if it's stale before abort", + zap.Uint64("task_id", taskID), + zap.String("status", status)) + + // get the task's heartbeat time to check if it's stale + heartbeatSQL := fmt.Sprintf(selectTaskHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName) + heartbeatRows, _, heartbeatErr := execCtx.ExecRestrictedSQL(ctx, sessionOpts, heartbeatSQL, taskID) + if heartbeatErr != nil { + log.Warn("failed to check task heartbeat during abort, skipping", + zap.Uint64("task_id", taskID), + zap.Error(heartbeatErr)) + return nil + } + + if len(heartbeatRows) == 0 { + log.Warn("task not found when checking heartbeat, skipping abort", + zap.Uint64("task_id", taskID)) + return nil + } + + initialHeartbeatTime := heartbeatRows[0].GetTime(0).String() + + // check if the task is stale (not updating heartbeat) + isStale, staleErr := r.isTaskStale(ctx, taskID, initialHeartbeatTime) + if staleErr != nil { + log.Warn("failed to determine if task is stale, skipping abort", + zap.Uint64("task_id", taskID), + zap.Error(staleErr)) + return nil + } + + if !isStale { + log.Info("task is actively running, cannot abort", + zap.Uint64("task_id", taskID), + zap.String("status", status)) + return nil + } + + log.Info("task is stale, proceeding with abort", + zap.Uint64("task_id", taskID), + zap.String("status", status)) + } else { + log.Error("task is in unexpected status, cannot abort", + zap.Uint64("task_id", taskID), + zap.String("status", status)) + return nil + } + + // delete the paused task + deleteSQL := fmt.Sprintf(deleteRegistrationSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName) + _, _, err = execCtx.ExecRestrictedSQL(ctx, sessionOpts, deleteSQL, taskID) + if err != nil { + return errors.Annotatef(err, "failed to delete task %d", taskID) + } + + deletedTaskID = taskID + log.Info("successfully deleted matching paused task", zap.Uint64("task_id", taskID)) + + return nil + }) + + if err != nil { + return 0, err + } + + if deletedTaskID != 0 { + log.Info("successfully deleted matching task", zap.Uint64("task_id", deletedTaskID)) + } + + return deletedTaskID, nil +} diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 51d1e1e494..1807ed5a1c 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -1341,6 +1341,7 @@ func (rc *SnapClient) execAndValidateChecksum( item, exists := rc.checkpointChecksum[tbl.Table.ID] if !exists { + log.Info("did not find checksum from checkpoint, scanning table to calculate checksum") startTS, err := restore.GetTSWithRetry(ctx, rc.pdClient) if err != nil { return errors.Trace(err) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index ea890cdf5f..45caac69a1 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -2573,3 +2573,127 @@ func setTablesRestoreModeIfNeeded(tables []*metautil.Table, cfg *SnapshotRestore log.Info("set tables to restore mode for filtered PiTR restore", zap.Int("table count", len(tables))) } } + +// RunRestoreAbort aborts a restore task by finding it in the registry and cleaning up +// Similar to resumeOrCreate, it first resolves the restoredTS then finds and deletes the matching paused task +func RunRestoreAbort(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + cfg.Adjust() + defer summary.Summary(cmdName) + ctx, cancel := context.WithCancel(c) + defer cancel() + + keepaliveCfg := GetKeepalive(&cfg.Config) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, true, conn.NormalVersionChecker) + if err != nil { + return errors.Trace(err) + } + defer mgr.Close() + + // get upstream cluster ID and startTS from backup storage if not already set + if cfg.UpstreamClusterID == 0 { + if IsStreamRestore(cmdName) { + // For PiTR restore, get cluster ID from log storage + _, s, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + logInfo, err := getLogInfoFromStorage(ctx, s) + if err != nil { + return errors.Trace(err) + } + cfg.UpstreamClusterID = logInfo.clusterID + + // For PiTR with full backup, get startTS from full backup meta + if len(cfg.FullBackupStorage) > 0 && cfg.StartTS == 0 { + startTS, fullClusterID, err := getFullBackupTS(ctx, cfg) + if err != nil { + return errors.Trace(err) + } + if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID { + return errors.Annotatef(berrors.ErrInvalidArgument, + "cluster ID mismatch: log backup from cluster %d, full backup from cluster %d", + logInfo.clusterID, fullClusterID) + } + cfg.StartTS = startTS + log.Info("extracted startTS from full backup storage for abort", + zap.Uint64("start_ts", cfg.StartTS)) + } + } else { + // For snapshot restore, get cluster ID from backup meta + _, _, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + cfg.UpstreamClusterID = backupMeta.ClusterId + } + log.Info("extracted upstream cluster ID from backup storage for abort", + zap.Uint64("upstream_cluster_id", cfg.UpstreamClusterID), + zap.String("cmd", cmdName)) + } + + // build restore registry + restoreRegistry, err := registry.NewRestoreRegistry(g, mgr.GetDomain()) + if err != nil { + return errors.Trace(err) + } + defer restoreRegistry.Close() + + // determine if restoredTS was user-specified + // if RestoreTS is 0, it means user didn't specify it (similar to resumeOrCreate logic) + isRestoredTSUserSpecified := cfg.RestoreTS != 0 + + // create registration info from config to find matching tasks + registrationInfo := registry.RegistrationInfo{ + FilterStrings: cfg.FilterStr, + StartTS: cfg.StartTS, + RestoredTS: cfg.RestoreTS, + UpstreamClusterID: cfg.UpstreamClusterID, + WithSysTable: cfg.WithSysTable, + Cmd: cmdName, + } + + // find and delete matching paused task atomically + // this will first resolve the restoredTS (similar to resumeOrCreate) then find and delete the task + deletedRestoreID, err := restoreRegistry.FindAndDeleteMatchingTask(ctx, registrationInfo, isRestoredTSUserSpecified) + if err != nil { + return errors.Trace(err) + } + + if deletedRestoreID == 0 { + log.Info("no paused restore task found with matching parameters") + return nil + } + + log.Info("successfully deleted matching paused restore task", zap.Uint64("restoreId", deletedRestoreID)) + + // clean up checkpoint data for the deleted task + log.Info("cleaning up checkpoint data", zap.Uint64("restoreId", deletedRestoreID)) + + // update config with restore ID to clean up checkpoint + cfg.RestoreID = deletedRestoreID + + // initialize all checkpoint managers for cleanup (deletion is noop if checkpoints not exist) + if len(cfg.CheckpointStorage) > 0 { + clusterID := mgr.GetPDClient().GetClusterID(ctx) + log.Info("initializing storage checkpoint meta managers for cleanup", + zap.Uint64("restoreID", deletedRestoreID), + zap.Uint64("clusterID", clusterID)) + if err := cfg.newStorageCheckpointMetaManagerPITR(ctx, clusterID, deletedRestoreID); err != nil { + log.Warn("failed to initialize storage checkpoint meta managers for cleanup", zap.Error(err)) + } + } else { + log.Info("initializing table checkpoint meta managers for cleanup", + zap.Uint64("restoreID", deletedRestoreID)) + if err := cfg.newTableCheckpointMetaManagerPITR(g, mgr.GetDomain(), deletedRestoreID); err != nil { + log.Warn("failed to initialize table checkpoint meta managers for cleanup", zap.Error(err)) + } + } + + // clean up checkpoint data + cleanUpCheckpoints(ctx, cfg) + + log.Info("successfully aborted restore task and cleaned up checkpoint data. "+ + "Use drop statements to clean up the restored data from the cluster if you want to.", + zap.Uint64("restoreId", deletedRestoreID)) + return nil +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d2421c6209..f9f458d558 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1337,6 +1337,9 @@ func RunStreamRestore( // if not set by user, restore to the max TS available if cfg.RestoreTS == 0 { cfg.RestoreTS = logInfo.logMaxTS + cfg.IsRestoredTSUserSpecified = false + } else { + cfg.IsRestoredTSUserSpecified = true } cfg.UpstreamClusterID = logInfo.clusterID diff --git a/br/tests/br_parallel_restore/run.sh b/br/tests/br_parallel_restore/run.sh index 443e440bfc..063f778440 100644 --- a/br/tests/br_parallel_restore/run.sh +++ b/br/tests/br_parallel_restore/run.sh @@ -39,6 +39,86 @@ cleanup_and_exit() { } trap cleanup_and_exit EXIT +verify_table_data() { + local db_name="$1" + local table_prefix="$2" + local expected_value="$3" + local test_context="${4:-verification}" + + echo "Verifying table $db_name.${table_prefix}_$expected_value contains value $expected_value ($test_context)..." + run_sql "select c from $db_name.${table_prefix}_$expected_value;" + check_contains "c: $expected_value" +} + +verify_registry() { + local filter_condition="$1" + local should_exist="$2" # true or false + local test_context="${3:-registry check}" + + if [ "$should_exist" = "true" ]; then + echo "Verifying that registry entries EXIST for '$filter_condition' ($test_context)..." + else + echo "Verifying that NO registry entries exist for '$filter_condition' ($test_context)..." + fi + + run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE $filter_condition;" + + # Extract count from MySQL output + registry_result=$(cat "$TEST_DIR/sql_res.$TEST_NAME.txt") + actual_count=$(echo "$registry_result" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') + actual_count=${actual_count:-0} + + if [ "$should_exist" = "true" ]; then + if [ "$actual_count" -gt 0 ]; then + echo "PASS: Found $actual_count registry entries" + return 0 + else + echo "ERROR: No registry entries found for condition: $filter_condition" + echo "Registry query result:" + echo "$registry_result" + exit 1 + fi + else + if [ "$actual_count" -eq 0 ]; then + echo "PASS: No registry entries found (as expected)" + return 0 + else + echo "ERROR: Found $actual_count registry entries when expecting none" + echo "Registry query result:" + echo "$registry_result" + exit 1 + fi + fi +} + + + +verify_snapshot_checkpoint_databases_exist() { + local expected_count="${1:-4}" + local test_context="${2:-checkpoint existence check}" + + echo "Verifying snapshot checkpoint databases exist ($test_context)..." + + # Check for snapshot checkpoint databases - every restore type creates these + local snapshot_checkpoint_dbs=$(run_sql "SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME LIKE '__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint_%';" | grep -v "SCHEMA_NAME" | grep -v "^$" || true) + + local actual_count=0 + if [ -n "$snapshot_checkpoint_dbs" ]; then + actual_count=$(echo "$snapshot_checkpoint_dbs" | wc -l | tr -d ' ') + echo "Found snapshot checkpoint databases:" + echo "$snapshot_checkpoint_dbs" | sed 's/^/ /' + fi + + if [ "$actual_count" -ge "$expected_count" ]; then + echo "PASS: Found $actual_count snapshot checkpoint databases (expected at least $expected_count)" + return 0 + else + echo "WARNING: Found only $actual_count snapshot checkpoint databases (expected at least $expected_count)" + echo "This may indicate checkpoints weren't created properly" + return 1 + fi +} + verify_no_temporary_databases() { local test_case="${1:-unknown test}" # this function verifies that BR has properly cleaned up all temporary databases @@ -60,6 +140,24 @@ verify_no_temporary_databases() { echo "Verification passed: No temporary databases found in $test_case" } +verify_checkpoint_cleanup() { + local test_case="${1:-unknown test}" + echo "Verifying checkpoint cleanup after abort in $test_case..." + + # Check for any BR temporary databases + local temp_dbs=$(run_sql "SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME LIKE '__TiDB_BR_Temporary_%';" | grep -v "SCHEMA_NAME" | grep -v "^$" || true) + + if [ -n "$temp_dbs" ]; then + echo "WARNING: Found temporary databases that should have been cleaned up:" + echo "$temp_dbs" | sed 's/^/ /' + echo "NOTE: Some temporary databases remain - this may be expected if cleanup is async or if there are other running tests" + else + echo "PASS: All temporary databases have been cleaned up" + fi + + echo "Checkpoint cleanup verification completed for $test_case" +} + create_tables_with_values() { local prefix=$1 # table name prefix local count=$2 # number of tables to create @@ -142,24 +240,24 @@ test_mixed_parallel_restores() { # Verify DB restore (full database with filter) echo "Verifying full restore with filter of $DB" for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB.full_$i;" | grep $i + verify_table_data $DB "full" $i done # Verify DB2 restore (PITR with filter) echo "Verifying PITR restore with filter of $DB2..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB2.full_$i;" | grep $i + verify_table_data $DB2 "full" $i done # Verify DB3 restore (specific db) echo "Verifying specific db restore for $DB3..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB3.full_$i;" | grep $i + verify_table_data $DB3 "full" $i done # Verify DB4 restore (specific table) echo "Verifying specific table restore for $DB4..." - run_sql "select c from $DB4.full_1;" | grep 1 + verify_table_data $DB4 "full" 1 for i in $(seq 2 $TABLE_COUNT); do if run_sql "show tables from $DB4 like 'full_$i';" | grep -q "full_$i"; then echo "Error: Table full_$i should not have been restored" >&2 @@ -170,7 +268,7 @@ test_mixed_parallel_restores() { # Verify DB_LOG restore echo "Verifying PITR restore of $DB_LOG (created after backup)..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB_LOG.log_$i;" | grep $i + verify_table_data $DB_LOG "log" $i done echo "Mixed parallel restores with specified parameters completed successfully" @@ -206,8 +304,9 @@ test_concurrent_restore_table_conflicts() { run_br restore point --filter "$DB.*" --full-backup-storage "$BACKUP_DIR" -s "$LOG_BACKUP_DIR" # verify the first database was restored correctly + echo "Verifying first database was restored correctly..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB.full_$i;" | grep $i + verify_table_data $DB "full" $i done verify_no_temporary_databases "Test Case 2: Concurrent restore with table conflicts" @@ -230,8 +329,9 @@ test_restore_with_different_systable_settings() { # should succeed because we're using a different with-sys-table setting run_br restore full --filter "mysql.*" --filter "$DB2.*" --with-sys-table=false -s "$BACKUP_DIR" + echo "Verifying different system table settings restore..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB2.full_$i;" | grep $i + verify_table_data $DB2 "full" $i done # complete the first one @@ -271,10 +371,6 @@ test_auto_restored_ts_conflict() { echo "Waiting for log checkpoint to advance..." . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance ${TASK_NAME}_pitr - # wait a few seconds to ensure log backup has progressed beyond this timestamp - echo "Waiting for log backup to advance..." - sleep 5 - # stop log backup run_br log stop --task-name ${TASK_NAME}_pitr @@ -295,31 +391,8 @@ test_auto_restored_ts_conflict() { echo "First PiTR restore failed as expected, leaving paused registry entry" - echo "Checking for registry entries..." - - registry_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "TABLE_ERROR") - - echo "Debug: Registry check output: '$registry_check'" - - # Extract count from MySQL vertical format output - first_task_count=$(echo "$registry_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') - - if [ -z "$first_task_count" ] || [ "$first_task_count" -eq 0 ]; then - echo "Error: No tasks found in registry (count: ${first_task_count:-0})" - echo "Raw output: '$registry_check'" - return 1 - fi - - echo "Found $first_task_count task(s) in registry" - - # Get basic task information for verification - echo "Debug: Checking task details..." - run_sql "SELECT id, status, filter_strings FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' ORDER BY id DESC LIMIT 1;" || { - echo "Warning: Could not query task details, continuing..." - return 0 - } - - echo "PASS: Registry verification completed (found entries as expected)" + # Verify that a task was created in the registry + verify_registry "filter_strings = '$DB.*'" true "Test 1: Failed restore should create registry entry" # Test 2: Retry without explicit restored-ts (auto-detection) - should resume existing paused task echo "Test 2: Retry without explicit restored-ts (auto-detection)..." @@ -330,25 +403,12 @@ test_auto_restored_ts_conflict() { # Verify the restore succeeded for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB.pitr_$i;" | grep $i + verify_table_data $DB "pitr" $i done echo "PASS: Second restore with auto-detection succeeded by resuming existing task" - # Check registry state after successful restore - echo "Checking registry state after restore..." - registry_count_after=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "0") - echo "Debug: Registry entries after restore: '$registry_count_after'" - - final_count=$(echo "$registry_count_after" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') - final_count=${final_count:-0} - - if [ "$final_count" -eq 0 ]; then - echo "PASS: Task was completed and cleaned up from registry" - else - echo "ERROR: Task may still exist in registry (count: $final_count)" - # Show registry state for debugging - run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "Could not query details" - fi + # Check registry state after successful restore - should be cleaned up + verify_registry "filter_strings = '$DB.*'" false "Test 2: Successful restore should clean up registry" # Clean up any remaining registry entries run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" @@ -368,27 +428,13 @@ test_auto_restored_ts_conflict() { exit 1 fi export GO_FAILPOINTS="" - + # Now manually change the paused task to running status to simulate a stuck running task echo "Manually changing paused task to running status to simulate stuck task..." run_sql "UPDATE mysql.tidb_restore_registry SET status = 'running' WHERE filter_strings = '$DB.*' AND status = 'paused';" # Verify that we actually have a running task after the update - echo "Verifying that we have a running task..." - stuck_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' AND status = 'running';" 2>/dev/null || echo "0") - echo "Debug: Running task check: '$stuck_check'" - - # Verify we have exactly 1 running task before attempting restore - stuck_count=$(echo "$stuck_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') - stuck_count=${stuck_count:-0} - - if [ "$stuck_count" -eq 0 ]; then - echo "ERROR: Failed to create running task for stale detection test" - echo "Expected to have at least 1 running task after manual status update" - exit 1 - fi - - echo "Successfully created $stuck_count running task(s) for stale detection test" + verify_registry "filter_strings = '$DB.*' AND status = 'running'" true "Test 3: Should have running task for stale detection test" # Try to restore without explicit restored-ts - should detect stale task, transition it to paused, and reuse it echo "Attempting restore without explicit restored-ts (testing stale detection and reuse)..." @@ -399,79 +445,253 @@ test_auto_restored_ts_conflict() { # Verify the restore succeeded by checking the data echo "Verifying restore succeeded..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB.pitr_$i;" | grep $i + verify_table_data $DB "pitr" $i done echo "PASS: Stale task detection and reuse worked correctly" # Check final task status - should be cleaned up after successful restore - echo "Checking final task status after successful restore..." - final_task_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "0") - final_task_count=$(echo "$final_task_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') - final_task_count=${final_task_count:-0} + verify_registry "filter_strings = '$DB.*'" false "Test 3: Successful restore should clean up registry" - if [ "$final_task_count" -eq 0 ]; then - echo "PASS: Task was completed and cleaned up from registry" - else - echo "ERROR: Task should have been cleaned up but $final_task_count task(s) remain in registry" - run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" 2>/dev/null || echo "Could not query task details" - exit 1 - fi + # Test 4: Test that user can resume paused task with same explicit RestoreTS + echo "Test 4: Testing user can resume paused task with same explicit RestoreTS..." - # Test 4: Test with user-specified RestoreTS (should bypass conflict resolution) - echo "Test 4: Testing user-specified RestoreTS (should bypass all conflict resolution)..." + # Clean up any existing data from previous tests (use $DB since backup only contains $DB data) + run_sql "drop database if exists $DB;" - # Use DB2 for this test to avoid table conflicts with existing DB - run_sql "drop database if exists $DB2;" - - # Create a paused task for DB2 first + # Create a paused task for $DB first export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)" restore_fail=0 - run_br restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" || restore_fail=1 + run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" || restore_fail=1 if [ $restore_fail -ne 1 ]; then echo 'expecting restore to fail before completion but succeeded' exit 1 fi export GO_FAILPOINTS="" - echo "Checking for created paused task for $DB2..." - run_sql "SELECT id, restored_ts FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB2.*' ORDER BY id DESC LIMIT 1;" 2>/dev/null || echo "Could not query paused task details" + # Verify the paused task was created + verify_registry "filter_strings = '$DB.*'" true "Test 4: Failed restore should create registry entry" - # Now try with different user-specified RestoreTS - should create new task, not reuse existing - different_ts=$((log_backup_ts - 1000000)) # Slightly different timestamp - echo "Attempting restore with user-specified RestoreTS ($different_ts) different from existing task..." + # Get the task ID for verification + task_info_before=$(run_sql "SELECT id, status FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*' AND status = 'paused' ORDER BY id DESC LIMIT 1;" 2>/dev/null || echo "") + echo "Task info before resume: $task_info_before" - run_br restore point --filter "$DB2.*" --restored-ts $different_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" + # Now try to resume with the same user-specified RestoreTS - should reuse existing task + echo "Attempting to resume with same user-specified RestoreTS ($log_backup_ts)..." - # Verify restore succeeded - Note: The backup contains pitr_* tables, not full_* tables + run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$PITR_BACKUP_DIR" -s "$PITR_LOG_BACKUP_DIR" + + # Verify restore succeeded - check that the expected tables exist and have correct data + echo "Verifying restore succeeded..." for i in $(seq 1 $TABLE_COUNT); do - run_sql "select c from $DB2.pitr_$i;" | grep $i + verify_table_data $DB "pitr" $i done - echo "PASS: User-specified RestoreTS bypassed conflict resolution correctly" + + # Verify that the same task was resumed (not a new task created) + verify_registry "filter_strings = '$DB.*'" false "Test 4: Task should be cleaned up after successful restore" + + echo "PASS: User-specified RestoreTS correctly resumed existing paused task" # Final cleanup run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB.*';" - run_sql "DELETE FROM mysql.tidb_restore_registry WHERE filter_strings = '$DB2.*';" - echo "Final registry state check:" - final_check=$(run_sql "SELECT COUNT(*) FROM mysql.tidb_restore_registry WHERE filter_strings LIKE '$DB%' OR filter_strings LIKE '$DB2%';" 2>/dev/null || echo "0") - echo "Debug: Final registry check: '$final_check'" - - final_remaining=$(echo "$final_check" | grep -o 'COUNT.*: [0-9]*' | grep -o '[0-9]*') - final_remaining=${final_remaining:-0} - - if [ "$final_remaining" -eq 0 ]; then - echo "PASS: Registry is clean" - else - echo "Note: $final_remaining task(s) remain in registry" - run_sql "SELECT id, status, filter_strings FROM mysql.tidb_restore_registry WHERE filter_strings LIKE '$DB%' OR filter_strings LIKE '$DB2%';" 2>/dev/null || echo "Could not query remaining task details" - fi + # Final registry state check + verify_registry "filter_strings LIKE '$DB%'" false "Final cleanup: Registry should be clean" echo "Intelligent RestoreTS resolution test completed successfully" - echo "This test validates the new behavior:" - echo "1. Failed restore leaves paused task in registry" - echo "2. Retry without explicit restored-ts resumes existing paused task" - echo "3. Stale running task detection works (when applicable)" - echo "4. User-specified restored-ts bypasses conflict resolution" + cleanup +} + +test_restore_abort() { + echo "Test Case 6: Comprehensive restore abort functionality for all restore types" + + # use separate backup directories for this test + ABORT_BACKUP_DIR="local://$TEST_DIR/abort_backup" + ABORT_LOG_BACKUP_DIR="local://$TEST_DIR/abort_log_backup" + + echo "Setting up backup data for abort testing..." + + # create initial data for multiple databases + run_sql "create database if not exists $DB;" + run_sql "create database if not exists $DB2;" + run_sql "create database if not exists $DB3;" + run_sql "create database if not exists $DB4;" + + create_tables_with_values "abort" $TABLE_COUNT $DB + create_tables_with_values "abort" $TABLE_COUNT $DB2 + create_tables_with_values "abort" $TABLE_COUNT $DB3 + create_tables_with_values "abort" $TABLE_COUNT $DB4 + + # start log backup + run_br log start --task-name ${TASK_NAME}_abort -s "$ABORT_LOG_BACKUP_DIR" + + # take snapshot backup + run_br backup full -s "$ABORT_BACKUP_DIR" + + # add more data after snapshot backup + run_sql "create database if not exists ${DB}_after_snapshot;" + create_tables_with_values "after" $TABLE_COUNT ${DB}_after_snapshot + + # wait for log checkpoint to advance + log_backup_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") + echo "Using log backup timestamp: $log_backup_ts" + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance ${TASK_NAME}_abort + + # stop log backup + run_br log stop --task-name ${TASK_NAME}_abort + + # clean up source data + run_sql "drop database $DB;" + run_sql "drop database $DB2;" + run_sql "drop database $DB3;" + run_sql "drop database $DB4;" + run_sql "drop database ${DB}_after_snapshot;" + + echo "=== Step 1: Create All Paused Restore Tasks ===" + + export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)" + + echo "Creating paused PiTR restore task..." + restore_fail=0 + run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting PiTR restore to fail before completion but succeeded' + exit 1 + fi + + echo "Creating paused full restore task..." + restore_fail=0 + run_br restore full --filter "$DB2.*" -s "$ABORT_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting full restore to fail before completion but succeeded' + exit 1 + fi + + echo "Creating paused database restore task..." + restore_fail=0 + run_br restore db --db "$DB3" -s "$ABORT_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting db restore to fail before completion but succeeded' + exit 1 + fi + + echo "Creating paused table restore task..." + restore_fail=0 + run_br restore table --db "$DB4" --table "abort_1" -s "$ABORT_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting table restore to fail before completion but succeeded' + exit 1 + fi + + export GO_FAILPOINTS="" + + echo "=== Step 2: Verify All Paused Tasks Exist ===" + + # Verify all 4 paused tasks exist in registry + verify_registry "filter_strings = '$DB.*' AND status = 'paused'" true "paused PiTR task" + verify_registry "filter_strings = '$DB2.*' AND status = 'paused'" true "paused full restore task" + verify_registry "cmd = 'DataBase Restore' AND status = 'paused'" true "paused database restore task" + verify_registry "cmd = 'Table Restore' AND status = 'paused'" true "paused table restore task" + + # Verify that snapshot checkpoint databases were created for the 4 restore tasks + verify_snapshot_checkpoint_databases_exist 4 "after creating 4 paused restore tasks" + + # Debug: Print entire registry contents before abort + echo "DEBUG: Registry contents before abort attempt:" + run_sql "SELECT id, filter_strings, start_ts, restored_ts, upstream_cluster_id, with_sys_table, status, cmd, last_heartbeat_time FROM mysql.tidb_restore_registry;" + cat "$TEST_DIR/sql_res.$TEST_NAME.txt" + + echo "=== Step 3: Abort All Restore Tasks ===" + + echo "Aborting PiTR restore task..." + run_br abort restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" + + echo "Aborting full restore task..." + run_br abort restore full --filter "$DB2.*" -s "$ABORT_BACKUP_DIR" + + echo "Aborting database restore task..." + run_br abort restore db --db "$DB3" -s "$ABORT_BACKUP_DIR" + + echo "Aborting table restore task..." + run_br abort restore table --db "$DB4" --table "abort_1" -s "$ABORT_BACKUP_DIR" + + echo "=== Step 4: Verify All Tasks Deleted ===" + + # Verify all tasks were deleted from registry + verify_registry "filter_strings = '$DB.*'" false "PiTR task deletion" + verify_registry "filter_strings = '$DB2.*'" false "full restore task deletion" + verify_registry "cmd = 'DataBase Restore'" false "database restore task deletion" + verify_registry "cmd = 'Table Restore'" false "table restore task deletion" + + # Verify no paused tasks remain + verify_registry "status = 'paused'" false "all paused tasks cleaned up" + + # Verify checkpoint cleanup for all restore types + verify_checkpoint_cleanup "All restore types abort" + + # Clean up all user databases since the aborted restores may have left partial data + echo "Cleaning up all user databases after abort test..." + run_sql "drop database if exists $DB;" + run_sql "drop database if exists $DB2;" + run_sql "drop database if exists $DB3;" + run_sql "drop database if exists $DB4;" + run_sql "drop database if exists ${DB}_after_snapshot;" + echo "User database cleanup completed" + + echo "=== Step 5: Special Cases ===" + + echo "Abort non-existent task (should succeed gracefully)..." + + # Try to abort when no matching task exists + run_br abort restore point --filter "${DB}_nonexistent.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" + + echo "PASS: Abort of non-existent task completed gracefully" + + echo "Abort with auto-detected restored-ts..." + + # Create another paused PiTR task + export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)" + restore_fail=0 + run_br restore point --filter "$DB.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting restore to fail before completion but succeeded' + exit 1 + fi + export GO_FAILPOINTS="" + + # Verify paused task exists + verify_registry "filter_strings = '$DB.*' AND status = 'paused'" true "paused task creation for auto-detection test" + + # Abort without specifying restored-ts (should use auto-detection) + run_br abort restore point --filter "$DB.*" --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" + + # Verify task was deleted + verify_registry "filter_strings = '$DB.*'" false "task deletion after abort with auto-detection" + + echo "Try to abort a stale running task (should detect it's dead and clean it up)..." + + # Create a paused task and manually change it to running + export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/fail-at-end-of-restore=return(true)" + restore_fail=0 + run_br restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo 'expecting restore to fail before completion but succeeded' + exit 1 + fi + export GO_FAILPOINTS="" + + # Change status to running + run_sql "UPDATE mysql.tidb_restore_registry SET status = 'running' WHERE filter_strings = '$DB2.*' AND status = 'paused';" + + # Verify we have a running task + verify_registry "filter_strings = '$DB2.*' AND status = 'running'" true "stale running task creation for abort test" + + # Try to abort the stale running task - should detect it's dead and clean it up + run_br abort restore point --filter "$DB2.*" --restored-ts $log_backup_ts --full-backup-storage "$ABORT_BACKUP_DIR" -s "$ABORT_LOG_BACKUP_DIR" + + # Verify task was deleted (abort should have detected it was stale and cleaned it up) + verify_registry "filter_strings = '$DB2.*'" false "stale running task should be deleted after abort" + + echo "Comprehensive restore abort functionality test completed successfully" cleanup } @@ -482,5 +702,6 @@ test_mixed_parallel_restores test_concurrent_restore_table_conflicts test_restore_with_different_systable_settings test_auto_restored_ts_conflict +test_restore_abort echo "Parallel restore tests completed successfully"