diff --git a/br/pkg/pdutil/BUILD.bazel b/br/pkg/pdutil/BUILD.bazel index cec06fda4d..e13829118a 100644 --- a/br/pkg/pdutil/BUILD.bazel +++ b/br/pkg/pdutil/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//br/pkg/errors", "//br/pkg/httputil", + "//pkg/kv", "//pkg/store/pdtypes", "//pkg/tablecodec", "//pkg/util/codec", @@ -42,6 +43,7 @@ go_test( flaky = True, shard_count = 4, deps = [ + "//pkg/kv", "//pkg/store/mockstore/unistore", "//pkg/testkit/testsetup", "@com_github_coreos_go_semver//semver", diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index e25831530f..bf256caba5 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" @@ -74,6 +75,8 @@ type ClusterConfig struct { Schedulers []string `json:"schedulers"` // Original scheudle configuration ScheduleCfg map[string]any `json:"schedule_cfg"` + // The region rule ID registered + RuleID string `json:"rule_id"` } type pauseSchedulerBody struct { @@ -344,7 +347,29 @@ func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string return errors.Trace(p.resumeSchedulerWith(ctx, schedulers)) } +func (p *PdController) ResumeRegionLabelRule(ctx context.Context, ruleID string) { + if ruleID == "" { + return + } + ruleRet, err := p.pdHTTPCli.GetRegionLabelRulesByIDs(ctx, []string{ruleID}) + if err != nil || len(ruleRet) == 0 { + log.Warn("failed to get the region label rule, the rule may have been removed", zap.String("rule-id", ruleID)) + return + } + rule := ruleRet[0] + // Set ttl to 0 to remove the rule. + rule.Labels[0].TTL = time.Duration(0).String() + deleteRule := &pdhttp.LabelRulePatch{DeleteRules: []string{ruleID}} + if err := p.pdHTTPCli.PatchRegionLabelRules(ctx, deleteRule); err != nil { + log.Warn("failed to delete region label rule, the rule will be removed after ttl expires", + zap.String("rule-id", rule.ID), zap.Error(err)) + } +} + func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string) (err error) { + if len(schedulers) == 0 { + return nil + } log.Info("resume scheduler", zap.Strings("schedulers", schedulers)) p.schedulerPauseCh <- struct{}{} @@ -413,6 +438,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg Cluster if err := pd.ResumeSchedulers(ctx, clusterCfg.Schedulers); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } + pd.ResumeRegionLabelRule(ctx, clusterCfg.RuleID) log.Info("restoring config", zap.Any("config", clusterCfg.ScheduleCfg)) mergeCfg := make(map[string]any) for cfgKey := range configsNeedRestore { @@ -441,6 +467,14 @@ func (p *PdController) MakeUndoFunctionByConfig(config ClusterConfig) UndoFunc { return p.GenRestoreSchedulerFunc(config, expectPDCfgGenerators) } +func (p *PdController) MakeFineGrainedUndoFunction(config ClusterConfig, undoFunc func()) UndoFunc { + restore := func(ctx context.Context) error { + undoFunc() + return restoreSchedulers(ctx, p, config, expectPDCfgGenerators) + } + return restore +} + // GenRestoreSchedulerFunc gen restore func func (p *PdController) GenRestoreSchedulerFunc(config ClusterConfig, configsNeedRestore map[string]pauseConfigGenerator) UndoFunc { @@ -477,7 +511,9 @@ func (p *PdController) RemoveSchedulersWithConfig( return } - undo = p.MakeUndoFunctionByConfig(ClusterConfig{Schedulers: origin.Schedulers, ScheduleCfg: origin.ScheduleCfg}) + undo = p.MakeUndoFunctionByConfig( + ClusterConfig{Schedulers: origin.Schedulers, ScheduleCfg: origin.ScheduleCfg, RuleID: ""}, + ) return undo, &origin, errors.Trace(err) } @@ -590,6 +626,41 @@ func (p *PdController) RemoveSchedulersWithConfigGenerator( return originCfg, removedCfg, nil } +func (p *PdController) GetOriginPDConfig( + ctx context.Context, +) (origin ClusterConfig, err error) { + originCfg := ClusterConfig{} + scheduleCfg, err := p.GetPDScheduleConfig(ctx) + if err != nil { + return originCfg, errors.Trace(err) + } + originCfg.ScheduleCfg = scheduleCfg + + log.Debug("saved PD config", zap.Any("config", scheduleCfg)) + + return originCfg, nil +} + +// To resume the schedulers, call the cancel function. +// wait until done is finished to ensure schedulers all resumed +func (p *PdController) RemoveSchedulersOnRegion(ctx context.Context, keyRange [][2]kv.Key) (string, func(), error) { + done, ruleID, err := pauseSchedulerByKeyRangeWithTTL(ctx, p.pdHTTPCli, keyRange, pauseTimeout) + // Wait for the rule to take effect because the PD operator is processed asynchronously. + // To synchronize this, checking the operator status may not be enough. For details, see + // https://github.com/pingcap/tidb/issues/49477. + // Let's use two times default value of `patrol-region-interval` from PD configuration. + <-time.After(20 * time.Millisecond) + + waitPauseSchedulerDone := func() { + if done == nil { + return + } + <-done + } + + return ruleID, waitPauseSchedulerDone, errors.Trace(err) +} + // RemoveSchedulersWithCfg removes pd schedulers and configs with specified ClusterConfig func (p *PdController) RemoveSchedulersWithCfg(ctx context.Context, removeCfg ClusterConfig) error { _, err := p.doRemoveSchedulersWith(ctx, removeCfg.Schedulers, removeCfg.ScheduleCfg) @@ -678,7 +749,7 @@ func PauseSchedulersByKeyRange( pdHTTPCli pdhttp.Client, startKey, endKey []byte, ) (done <-chan struct{}, err error) { - done, err = pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, startKey, endKey, pauseTimeout) + done, _, err = pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, [][2]kv.Key{{startKey, endKey}}, pauseTimeout) // Wait for the rule to take effect because the PD operator is processed asynchronously. // To synchronize this, checking the operator status may not be enough. For details, see // https://github.com/pingcap/tidb/issues/49477. @@ -690,9 +761,19 @@ func PauseSchedulersByKeyRange( func pauseSchedulerByKeyRangeWithTTL( ctx context.Context, pdHTTPCli pdhttp.Client, - startKey, endKey []byte, + keyRange [][2]kv.Key, ttl time.Duration, -) (<-chan struct{}, error) { +) (<-chan struct{}, string, error) { + var encodedKeyRangeRule = []KeyRangeRule{} + if len(keyRange) == 0 { + return nil, "", nil + } + for _, keyPair := range keyRange { + var rule KeyRangeRule + rule.StartKeyHex = hex.EncodeToString(keyPair[0]) + rule.EndKeyHex = hex.EncodeToString(keyPair[1]) + encodedKeyRangeRule = append(encodedKeyRangeRule, rule) + } rule := &pdhttp.LabelRule{ ID: uuid.New().String(), Labels: []pdhttp.RegionLabel{{ @@ -703,16 +784,13 @@ func pauseSchedulerByKeyRangeWithTTL( RuleType: "key-range", // Data should be a list of KeyRangeRule when rule type is key-range. // See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169. - Data: []KeyRangeRule{{ - StartKeyHex: hex.EncodeToString(startKey), - EndKeyHex: hex.EncodeToString(endKey), - }}, + Data: encodedKeyRangeRule, } done := make(chan struct{}) if err := pdHTTPCli.SetRegionLabelRule(ctx, rule); err != nil { close(done) - return nil, errors.Trace(err) + return nil, "", errors.Trace(err) } go func() { @@ -745,7 +823,7 @@ func pauseSchedulerByKeyRangeWithTTL( zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) } }() - return done, nil + return done, rule.ID, nil } // CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range. diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index cfd531fff1..09f0d7bf29 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -14,6 +14,7 @@ import ( "github.com/coreos/go-semver/semver" perrors "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/mockstore/unistore" "github.com/stretchr/testify/require" pdhttp "github.com/tikv/pd/client/http" @@ -157,7 +158,9 @@ func TestPauseSchedulersByKeyRange(t *testing.T) { defer pdHTTPCli.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - done, err := pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) + startKey := kv.Key{0, 0, 0, 0} + endKey := kv.Key{0xff, 0xff, 0xff, 0xff} + done, _, err := pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, [][2]kv.Key{{startKey, endKey}}, ttl) require.NoError(t, err) time.Sleep(ttl * 3) cancel() diff --git a/br/pkg/restore/import_mode_switcher.go b/br/pkg/restore/import_mode_switcher.go index c38b97ba23..e47a5d0972 100644 --- a/br/pkg/restore/import_mode_switcher.go +++ b/br/pkg/restore/import_mode_switcher.go @@ -15,6 +15,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/pkg/kv" tidbutil "github.com/pingcap/tidb/pkg/util" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -200,6 +201,45 @@ func RestorePreWork( return mgr.RemoveSchedulersWithConfig(ctx) } +func FineGrainedRestorePreWork( + ctx context.Context, + mgr *conn.Mgr, + switcher *ImportModeSwitcher, + keyRange [][2]kv.Key, + isOnline bool, + switchToImport bool, +) (pdutil.UndoFunc, *pdutil.ClusterConfig, error) { + if isOnline { + return pdutil.Nop, nil, nil + } + + if switchToImport { + // Switch TiKV cluster to import mode (adjust rocksdb configuration). + err := switcher.GoSwitchToImportMode(ctx) + if err != nil { + return pdutil.Nop, nil, err + } + } + + // pause config + originCfg, err := mgr.GetOriginPDConfig(ctx) + if err != nil { + return pdutil.Nop, nil, err + } + + // pause schedulers + ruleID, waitPauseSchedulerDone, err := mgr.RemoveSchedulersOnRegion(ctx, keyRange) + if err != nil { + return pdutil.Nop, nil, err + } + newCfg := originCfg + newCfg.RuleID = ruleID + + // handle undo + undo := mgr.MakeFineGrainedUndoFunction(newCfg, waitPauseSchedulerDone) + return undo, &originCfg, errors.Trace(err) +} + // RestorePostWork executes some post work after restore. // TODO: aggregate all lifetime manage methods into batcher's context manager field. func RestorePostWork( diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 01cb0729a0..3b3df7c9a6 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -124,6 +124,8 @@ type SnapClient struct { // use db pool to speed up restoration in BR binary mode. dbPool []*tidallocdb.DB + preallocedIDs *tidalloc.PreallocIDs + dom *domain.Domain // correspond to --tidb-placement-mode config. @@ -297,6 +299,9 @@ func (rc *SnapClient) SetPlacementPolicyMode(withPlacementPolicy string) { // the download stage. func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Table) error { preallocedTableIDs := tidalloc.New(tables) + if preallocedTableIDs == nil { + return errors.Errorf("failed to pre-alloc table IDs") + } ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error { return preallocedTableIDs.Alloc(meta.NewMutator(txn)) @@ -312,9 +317,25 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl if rc.db != nil { rc.db.RegisterPreallocatedIDs(preallocedTableIDs) } + rc.preallocedIDs = preallocedTableIDs return nil } +func (rc *SnapClient) GetPreAllocedTableIDRange() ([2]int64, error) { + if rc.preallocedIDs == nil { + return [2]int64{}, errors.Errorf("No preAlloced IDs") + } + + start, end := rc.preallocedIDs.GetIDRange() + + if start >= end { + log.Warn("PreAlloced IDs range is empty, no table to restore") + return [2]int64{start, end}, nil + } + + return [2]int64{start, end}, nil +} + // InitCheckpoint initialize the checkpoint status for the cluster. If the cluster is // restored for the first time, it will initialize the checkpoint metadata. Otherwrise, // it will load checkpoint metadata and checkpoint ranges/checksum from the external @@ -411,7 +432,7 @@ func (rc *SnapClient) InitCheckpoint( rc.restoreUUID = restoreID // a nil config means undo function if config != nil { - meta.SchedulersConfig = &pdutil.ClusterConfig{Schedulers: config.Schedulers, ScheduleCfg: config.ScheduleCfg} + meta.SchedulersConfig = &pdutil.ClusterConfig{Schedulers: config.Schedulers, ScheduleCfg: config.ScheduleCfg, RuleID: config.RuleID} } if err := snapshotCheckpointMetaManager.SaveCheckpointMetadata(ctx, meta); err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) diff --git a/br/pkg/restore/snap_client/export_test.go b/br/pkg/restore/snap_client/export_test.go index a81d7e5e7a..cd02245479 100644 --- a/br/pkg/restore/snap_client/export_test.go +++ b/br/pkg/restore/snap_client/export_test.go @@ -77,6 +77,7 @@ func (rc *SnapClient) CreateTablesTest( newTS uint64, ) (*restoreutils.RewriteRules, []*model.TableInfo, error) { rc.dom = dom + rc.AllocTableIDs(context.TODO(), tables) rewriteRules := &restoreutils.RewriteRules{ Data: make([]*import_sstpb.RewriteRule, 0), } diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index e4edcb1012..ffd03967e9 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -64,9 +64,11 @@ go_library( "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/vardef", "//pkg/statistics/handle", + "//pkg/tablecodec", "//pkg/types", "//pkg/util", "//pkg/util/cdcutil", + "//pkg/util/codec", "//pkg/util/collate", "//pkg/util/engine", "//pkg/util/table-filter", @@ -117,7 +119,7 @@ go_test( ], embed = [":task"], flaky = True, - shard_count = 40, + shard_count = 41, deps = [ "//br/pkg/backup", "//br/pkg/config", @@ -136,6 +138,7 @@ go_test( "//br/pkg/utiltest", "//pkg/config", "//pkg/ddl", + "//pkg/kv", "//pkg/meta/model", "//pkg/parser/ast", "//pkg/parser/mysql", @@ -143,6 +146,7 @@ go_test( "//pkg/tablecodec", "//pkg/testkit", "//pkg/types", + "//pkg/util/codec", "//pkg/util/table-filter", "@com_github_docker_go_units//:go-units", "@com_github_gogo_protobuf//proto", diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index dd2d485445..a8c77b73a6 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -6,6 +6,7 @@ import ( "cmp" "context" "fmt" + "os" "slices" "strings" "sync/atomic" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/httputil" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" @@ -42,7 +44,9 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/engine" "github.com/spf13/cobra" @@ -1025,27 +1029,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s return cfg.piTRTaskInfo.FullRestoreCheckErr } - importModeSwitcher := restore.NewImportModeSwitcher(mgr.GetPDClient(), cfg.Config.SwitchModeInterval, mgr.GetTLSConfig()) - restoreSchedulersFunc, schedulersConfig, err := restore.RestorePreWork(ctx, mgr, importModeSwitcher, cfg.Online, true) - if err != nil { - return errors.Trace(err) - } - - // need to know whether restore has been completed so can restore schedulers - canRestoreSchedulers := false - defer func() { - // don't reset pd scheduler if checkpoint mode is used and restored is not finished - if cfg.UseCheckpoint && !canRestoreSchedulers { - log.Info("skip removing pd scheduler for next retry") - return - } - log.Info("start to restore pd scheduler") - // run the post-work to avoid being stuck in the import - // mode or emptied schedulers. - restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulersFunc, cfg.Online) - log.Info("finish restoring pd scheduler") - }() - if isFullRestore(cmdName) { if client.NeedCheckFreshCluster(cfg.ExplicitFilter, checkpointFirstRun) { if err = client.CheckTargetClusterFresh(ctx); err != nil { @@ -1061,7 +1044,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } } else if client.IsFull() && checkpointFirstRun && cfg.CheckRequirements { if err := checkTableExistence(ctx, mgr, tables); err != nil { - canRestoreSchedulers = true return errors.Trace(err) } } @@ -1077,31 +1059,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s return errors.Trace(err) } - // reload or register the checkpoint - var checkpointSetWithTableID map[int64]map[string]struct{} - if cfg.UseCheckpoint { - logRestoredTS := uint64(0) - if cfg.piTRTaskInfo != nil { - logRestoredTS = cfg.piTRTaskInfo.RestoreTS - } - sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint( - ctx, cfg.snapshotCheckpointMetaManager, schedulersConfig, logRestoredTS, checkpointFirstRun) - if err != nil { - return errors.Trace(err) - } - if restoreSchedulersConfigFromCheckpoint != nil { - restoreSchedulersFunc = mgr.MakeUndoFunctionByConfig(*restoreSchedulersConfigFromCheckpoint) - } - checkpointSetWithTableID = sets - - defer func() { - // need to flush the whole checkpoint data so that br can quickly jump to - // the log kv restore step when the next retry. - log.Info("wait for flush checkpoint...") - client.WaitForFinishCheckpoint(ctx, len(cfg.FullBackupStorage) > 0 || !canRestoreSchedulers) - }() - } - err = client.InstallPiTRSupport(ctx, snapclient.PiTRCollDep{ PDCli: mgr.GetPDClient(), EtcdCli: mgr.GetDomain().GetEtcdClient(), @@ -1246,6 +1203,100 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } } + importModeSwitcher := restore.NewImportModeSwitcher(mgr.GetPDClient(), cfg.Config.SwitchModeInterval, mgr.GetTLSConfig()) + var restoreSchedulersFunc pdutil.UndoFunc + var schedulersConfig *pdutil.ClusterConfig + if (isFullRestore(cmdName) && !cfg.ExplicitFilter) || client.IsIncremental() { + restoreSchedulersFunc, schedulersConfig, err = restore.RestorePreWork(ctx, mgr, importModeSwitcher, cfg.Online, true) + } else { + var preAllocRange [2]int64 + preAllocRange, err = client.GetPreAllocedTableIDRange() + if err != nil { + return errors.Trace(err) + } + var tableIDs []int64 + for _, table := range createdTables { + tableIDs = append(tableIDs, table.Table.ID) + if table.Table.Partition != nil { + for _, p := range table.Table.Partition.Definitions { + tableIDs = append(tableIDs, p.ID) + } + } + } + keyRange := SortKeyRanges(tableIDs, preAllocRange) + restoreSchedulersFunc, schedulersConfig, err = restore.FineGrainedRestorePreWork(ctx, mgr, importModeSwitcher, keyRange, cfg.Online, true) + } + if err != nil { + return errors.Trace(err) + } + + // need to know whether restore has been completed so can restore schedulers + canRestoreSchedulers := false + defer func() { + cancel() + // don't reset pd scheduler if checkpoint mode is used and restored is not finished + if cfg.UseCheckpoint && !canRestoreSchedulers { + log.Info("skip removing pd scheduler for next retry") + return + } + log.Info("start to restore pd scheduler") + // run the post-work to avoid being stuck in the import + // mode or emptied schedulers. + restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulersFunc, cfg.Online) + log.Info("finish restoring pd scheduler") + }() + + // reload or register the checkpoint + var checkpointSetWithTableID map[int64]map[string]struct{} + if cfg.UseCheckpoint { + logRestoredTS := uint64(0) + if cfg.piTRTaskInfo != nil { + logRestoredTS = cfg.piTRTaskInfo.RestoreTS + } + sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint( + ctx, cfg.snapshotCheckpointMetaManager, schedulersConfig, logRestoredTS, checkpointFirstRun) + if err != nil { + return errors.Trace(err) + } + if restoreSchedulersConfigFromCheckpoint != nil { + // The last range rule will be dropped when the last restore quits. + restoreSchedulersConfigFromCheckpoint.RuleID = schedulersConfig.RuleID + restoreSchedulersFunc = mgr.MakeUndoFunctionByConfig(*restoreSchedulersConfigFromCheckpoint) + } + checkpointSetWithTableID = sets + + defer func() { + // need to flush the whole checkpoint data so that br can quickly jump to + // the log kv restore step when the next retry. + log.Info("wait for flush checkpoint...") + client.WaitForFinishCheckpoint(ctx, len(cfg.FullBackupStorage) > 0 || !canRestoreSchedulers) + }() + } + + failpoint.Inject("sleep_for_check_scheduler_status", func(val failpoint.Value) { + fileName, ok := val.(string) + func() { + if !ok { + return + } + _, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if osErr != nil { + log.Warn("failed to create file", zap.Error(osErr)) + return + } + }() + for { + _, statErr := os.Stat(fileName) + if os.IsNotExist(statErr) { + break + } else if statErr != nil { + log.Warn("error checking file", zap.Error(statErr)) + break + } + time.Sleep(1 * time.Second) + } + }) + if cfg.tiflashRecorder != nil { for _, createdTable := range createdTables { cfg.tiflashRecorder.Rewrite(createdTable.OldTable.Info.ID, createdTable.Table.ID) @@ -1921,6 +1972,88 @@ func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (ds return } +func SortKeyRanges(ids []int64, preAlloced [2]int64) [][2]kv.Key { + if len(ids) == 0 { + return nil + } + slices.Sort(ids) + idRanges := calSortedTableIds(ids) + + if preAlloced[0] < preAlloced[1] { + overlap := false + for _, r := range idRanges { + if r[0] < preAlloced[1] && r[1] > preAlloced[0] { + overlap = true + break + } + } + if overlap { + idRanges = append(idRanges, []int64{preAlloced[0], preAlloced[1]}) + } + } + + mergedRanges := mergeIntervals(idRanges) + + keyRanges := make([][2]kv.Key, 0, len(mergedRanges)) + for _, r := range mergedRanges { + startKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(r[0])) + endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(r[1])) + keyRanges = append(keyRanges, [2]kv.Key{startKey, endKey}) + } + return keyRanges +} + +func calSortedTableIds(ids []int64) [][]int64 { + if len(ids) == 0 { + return [][]int64{} + } + + var idRanges [][]int64 + + start := ids[0] + end := start + 1 + + for i := 1; i < len(ids); i++ { + if ids[i] == ids[i-1]+1 { + end = ids[i] + 1 + } else { + idRanges = append(idRanges, []int64{start, end}) + start = ids[i] + end = start + 1 + } + } + idRanges = append(idRanges, []int64{start, end}) + + return idRanges +} + +func mergeIntervals(intervals [][]int64) [][]int64 { + if len(intervals) == 0 { + return nil + } + slices.SortFunc(intervals, func(a, b []int64) int { + if a[0] < b[0] { + return -1 + } else if a[0] > b[0] { + return 1 + } + return 0 + }) + merged := [][]int64{intervals[0]} + for i := 1; i < len(intervals); i++ { + last := merged[len(merged)-1] + current := intervals[i] + if current[0] <= last[1] { + if current[1] > last[1] { + last[1] = current[1] + } + } else { + merged = append(merged, current) + } + } + return merged +} + type DDLJobFilterRule func(ddlJob *model.Job) bool var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{ diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 71f3dccd0a..b578a70f09 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -26,11 +26,14 @@ import ( "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/codec" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -864,3 +867,105 @@ func TestAdjustTablesToRestoreAndCreateTableTracker(t *testing.T) { }) } } + +func TestSortKeyRanges(t *testing.T) { + makeKeyRange := func(start, end int64) [2]kv.Key { + return [2]kv.Key{ + codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(start)), + codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(end)), + } + } + testCases := []struct { + name string + ids []int64 + preAlloced [2]int64 + expected [][2]kv.Key + }{ + { + name: "empty_ids", + ids: []int64{}, + preAlloced: [2]int64{100, 200}, + expected: nil, + }, + { + name: "single_id", + ids: []int64{5}, + preAlloced: [2]int64{0, 0}, + expected: [][2]kv.Key{makeKeyRange(5, 6)}, + }, + { + name: "prealloc_non_overlap", + ids: []int64{50, 51, 52, 53}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(50, 54)}, + }, + { + name: "prealloc_partial_overlap", + ids: []int64{95, 96, 97, 98, 99, 100, 101, 102, 170, 171, 172}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(95, 200)}, + }, + { + name: "prealloc_contains_all", + ids: []int64{120, 121, 122, 123, 150, 180}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(100, 200)}, + }, + { + name: "prealloc_left_adjacent", + ids: []int64{99, 100}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(99, 200)}, + }, + { + name: "prealloc_left_not_adjacent", + ids: []int64{99}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(99, 100)}, + }, + { + name: "prealloc_right_adjacent", + ids: []int64{200}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(200, 201)}, + }, + { + name: "prealloc_not_adjacent", + ids: []int64{201}, + preAlloced: [2]int64{100, 200}, + expected: [][2]kv.Key{makeKeyRange(201, 202)}, + }, + { + name: "multiple_merge", + ids: []int64{10, 11, 15, 16, 20}, + preAlloced: [2]int64{12, 18}, + expected: [][2]kv.Key{ + makeKeyRange(10, 18), + makeKeyRange(20, 21), + }, + }, + { + name: "full_merge", + ids: []int64{5, 6, 7}, + preAlloced: [2]int64{4, 8}, + expected: [][2]kv.Key{makeKeyRange(4, 8)}, + }, + { + name: "min_boundary", + ids: []int64{0}, + preAlloced: [2]int64{0, 1}, + expected: [][2]kv.Key{makeKeyRange(0, 1)}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := task.SortKeyRanges(tc.ids, tc.preAlloced) + require.Equal(t, len(tc.expected), len(result)) + + for i := range result { + require.Equal(t, tc.expected[i][0], result[i][0], "StartKey mismatch at index %d", i) + require.Equal(t, tc.expected[i][1], result[i][1], "EndKey mismatch at index %d", i) + } + }) + } +} diff --git a/br/tests/br_db_skip/run.sh b/br/tests/br_db_skip/run.sh index 58c39bb329..79fbddff8c 100755 --- a/br/tests/br_db_skip/run.sh +++ b/br/tests/br_db_skip/run.sh @@ -56,7 +56,7 @@ run_sql "CREATE TABLE $DB.usertable1 ( \ echo "restore start must succeed" fail=false -run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema=true || fail=true +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema=true --check-requirements=false || fail=true if $fail; then echo "TEST: [$TEST_NAME] restore $DB with no-schema failed" exit 1 diff --git a/br/tests/br_region_rule/run.sh b/br/tests/br_region_rule/run.sh new file mode 100644 index 0000000000..015197689e --- /dev/null +++ b/br/tests/br_region_rule/run.sh @@ -0,0 +1,195 @@ +#!/bin/sh +# +# Copyright 2025 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +DB="$TEST_NAME" +TABLES_COUNT=300 +LOG_FILE="$TEST_DIR/log_file" +BACKUP_STORAGE="local://$TEST_DIR/${DB}" + +####################################### +# Create schema and sample tables for testing. +####################################### +create_test_tables() { + run_sql "create schema $DB;" + local i=1 + while [ $i -le $TABLES_COUNT ]; do + run_sql "create table $DB.sbtest$i ( + id int primary key, + k int not null, + c char(120) not null, + pad char(60) not null + );" + run_sql "insert into $DB.sbtest$i values ($i, $i, '$i', '$i');" + i=$((i+1)) + done +} + +####################################### +# Run backup for the entire db. +####################################### +backup_db() { + echo "Running backup for database: $DB ..." + run_br backup db --db "$DB" -s "$BACKUP_STORAGE" --pd "$PD_ADDR" + echo "Backup finished." +} + +####################################### +# Start a restore process in the background with custom arguments. +# Sets RESTORE_PID to the PID of the background restore. +# Arguments: +# $@ = additional arguments to pass to run_br (e.g., table list, flags) +####################################### +run_restore_in_background() { + run_br restore "$@" -s "$BACKUP_STORAGE" --pd "$PD_ADDR" & + RESTORE_PID=$! +} + +####################################### +# Wait for the checkpoint stage: +# - Waits for $LOG_FILE creation +# - Checks if region label rule is present (schedule=deny) +# If not found, kills the restore process and exits with failure. +####################################### +wait_for_checkpoint_stage() { + echo "Monitoring checkpoint stage (waiting for $LOG_FILE creation)..." + while [ ! -f "$LOG_FILE" ]; do + sleep 1 + done + ensure_region_label_rule_exists || { + echo "Error: Expected region label rule (schedule=deny) not found." + kill $RESTORE_PID + exit 1 + } + rm -f "$LOG_FILE" +} + +####################################### +# Check if region label rule 'schedule=deny' exists. +# Returns 0 if found, 1 if not found. +####################################### +check_region_label_rule_exists() { + local response exists + response=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules") + echo "$response" + exists=$(echo "$response" | jq 'any(.[]; .labels[]? | (.key=="schedule" and .value=="deny"))') + [ "$exists" = "true" ] +} + +####################################### +# Exits with 0 if 'schedule=deny' rule is found, +# otherwise returns 1 (for use in if-statements). +####################################### +ensure_region_label_rule_exists() { + check_region_label_rule_exists +} + +####################################### +# Exits with error if 'schedule=deny' rule is still present. +####################################### +ensure_region_label_rule_absent() { + if check_region_label_rule_exists; then + echo "Error: Region label rule (schedule=deny) should have been removed." + exit 1 + fi +} + +####################################### +# Perform restore test flow: +# 1) Drop schema $DB. +# 2) Start restore in background (with optional user-specified arguments). +# 3) Wait for checkpoint, check label rule exists, remove log, wait for restore to finish. +# 4) Check label rule is absent afterwards. +# Arguments: +# $@ = additional arguments passed to run_restore_in_background() +####################################### +perform_restore_test() { + run_sql "drop schema if exists $DB;" + run_restore_in_background "$@" + wait_for_checkpoint_stage + wait $RESTORE_PID + ensure_region_label_rule_absent +} + +####################################### +# MAIN TEST FLOW +####################################### + +# 1. Create tables and backup +create_test_tables +backup_db +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/sleep_for_check_scheduler_status=return(\"$LOG_FILE\")" + +# Test 1: Restore the whole db without checkpoint +echo "=== Test 1: restore the whole db without checkpoint ===" +perform_restore_test db --db "$DB" +echo "Test 1 finished successfully!" + +# Test 2: Restore random tables without checkpoint +echo "=== Test 2: restore random tables without checkpoint ===" +# We pick 50 random tables from 1..300 +TABLE_LIST=$(shuf -i 1-$TABLES_COUNT -n 50 | awk -v db="$DB" '{printf "-f %s.sbtest%s ", db, $1}') +perform_restore_test full $TABLE_LIST +echo "Test 2 finished successfully!" + +# Test 3: Attempt restore with checkpoint (inject corruption to force error) +echo "=== Test 3: restore with checkpoint (injected corruption) ===" +run_sql "drop schema if exists $DB;" + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/sleep_for_check_scheduler_status=return(\"$LOG_FILE\");github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"corrupt-last-table-files\")" + +run_restore_in_background full $TABLE_LIST +wait_for_checkpoint_stage + +set +e +wait $RESTORE_PID +exit_code=$? +set -e + +if [ $exit_code -eq 0 ]; then + echo "Error: restore unexpectedly succeeded despite corruption" + exit 1 +fi +ensure_region_label_rule_absent + +echo "=== Test 3: retry restore full db with checkpoint enabled ===" + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/sleep_for_check_scheduler_status=return(\"$LOG_FILE\")" +run_restore_in_background db --db "$DB" +wait_for_checkpoint_stage +wait $RESTORE_PID +ensure_region_label_rule_absent +echo "Test 3 finished successfully!" + +# Test 4: Restore full without checkpoint (check deny rule absent) +echo "=== Test4: restore full without checkpoint (check deny rule absent) ===" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/sleep_for_check_scheduler_status=return(\"$LOG_FILE\")" +run_sql "drop schema if exists $DB;" +run_restore_in_background full +echo "Monitoring checkpoint stage (expecting no deny rule)..." +while [ ! -f "$LOG_FILE" ]; do + sleep 1 +done +ensure_region_label_rule_absent +rm -f "$LOG_FILE" +wait $RESTORE_PID +ensure_region_label_rule_absent +echo "Test4 finished successfully!" + +export GO_FAILPOINTS="" +echo "All tests finished successfully!" +exit 0 \ No newline at end of file diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 99ff3693bf..12234f8062 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -24,7 +24,7 @@ groups=( ["G01"]="br_autoid br_crypter2 br_db br_check_dup_table br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash" ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index' - ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table ' + ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table br_region_rule' ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index' ["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption' ["G07"]='br_pitr' diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index c9e3a1f0dd..72bb7cdd63 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -63,26 +63,6 @@ type GcSafePoints struct { } `json:"service_gc_safe_points"` } -func verifyGCStopped(t *require.Assertions, cfg operator.PauseGcConfig) { - var result GcSafePoints - t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) - for _, sp := range result.SPs { - if sp.ServiceID != "gc_worker" { - t.Equal(int64(cfg.SafePoint)-1, sp.SafePoint, result.SPs) - } - } -} - -func verifyGCNotStopped(t *require.Assertions, cfg operator.PauseGcConfig) { - var result GcSafePoints - t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) - for _, sp := range result.SPs { - if sp.ServiceID != "gc_worker" { - t.FailNowf("the service gc safepoint exists", "it is %#v", sp) - } - } -} - func verifyTargetGCSafePointExist(t *require.Assertions, cfg operator.PauseGcConfig) { var result GcSafePoints t.NoError(getJSON(pdAPI(cfg, serviceGCSafepointPrefix), &result)) @@ -232,7 +212,6 @@ func TestOperator(t *testing.T) { } }, 10*time.Second, time.Second) - verifyGCStopped(req, cfg) verifyTargetGCSafePointExist(req, cfg) verifyLightningStopped(req, cfg) verifySchedulersStopped(req, cfg)