From eca3e7af4489ee14ac374cdb92bc2deef4a7b248 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 24 Apr 2023 12:32:45 +0800 Subject: [PATCH] br: directly use the rewrite rule from schemaReplaces (#43190) * draft Signed-off-by: Leavrth * draft Signed-off-by: Leavrth * make bazel_prepare Signed-off-by: Leavrth --------- Signed-off-by: Leavrth --- br/pkg/task/BUILD.bazel | 1 - br/pkg/task/stream.go | 51 +++-------------------------------------- 2 files changed, 3 insertions(+), 49 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 95c5be93f0..7a4fa81406 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -83,7 +83,6 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zapcore", ], ) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index a6aef0db6c..e0c684d8be 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -57,7 +57,6 @@ import ( "github.com/tikv/client-go/v2/oracle" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" ) @@ -1344,12 +1343,7 @@ func restoreStream( return errors.Annotate(err, "failed to restore meta files") } - // perform restore kv files - rewriteRules, err := initRewriteRules(client, fullBackupTables) - if err != nil { - return errors.Trace(err) - } - updateRewriteRules(rewriteRules, schemasReplace) + rewriteRules := initRewriteRules(schemasReplace) ingestRecorder := schemasReplace.GetIngestRecorder() if err := client.RangeFilterFromIngestRecorder(ingestRecorder, rewriteRules); err != nil { @@ -1702,48 +1696,8 @@ func initFullBackupTables( return tables, nil } -func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) (map[int64]*restore.RewriteRules, error) { - // compare table exists in cluster and map[table]table.Info to get rewrite rules. +func initRewriteRules(schemasReplace *stream.SchemasReplace) map[int64]*restore.RewriteRules { rules := make(map[int64]*restore.RewriteRules) - for _, t := range tables { - if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok { - // skip system table for now - continue - } - if t.Info == nil { - continue - } - - newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name) - if err != nil { - // If table not existed, skip it directly. - continue - } - // we don't handle index rule in pitr. since we only support pitr on non-exists table. - tableRules := restore.GetRewriteRulesMap(newTableInfo, t.Info, 0, false) - for tableID, tableRule := range tableRules { - rules[tableID] = tableRule - } - - log.Info("Using rewrite rule for table.", zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name), - zap.Int("old-id", int(t.Info.ID)), - zap.Array("rewrite-rules", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error { - for _, r := range tableRules { - for _, rule := range r.Data { - if err := ae.AppendObject(logutil.RewriteRuleObject(rule)); err != nil { - return err - } - } - } - return nil - })), - ) - } - return rules, nil -} - -func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *stream.SchemasReplace) { filter := schemasReplace.TableFilter for _, dbReplace := range schemasReplace.DbMap { @@ -1774,6 +1728,7 @@ func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *s } } } + return rules } func newRawBatchClient(