br: directly use the rewrite rule from schemaReplaces (#43190)
* draft Signed-off-by: Leavrth <jianjun.liao@outlook.com> * draft Signed-off-by: Leavrth <jianjun.liao@outlook.com> * make bazel_prepare Signed-off-by: Leavrth <jianjun.liao@outlook.com> --------- Signed-off-by: Leavrth <jianjun.liao@outlook.com>
This commit is contained in:
@ -83,7 +83,6 @@ go_library(
|
||||
"@org_golang_x_sync//errgroup",
|
||||
"@org_uber_go_multierr//:multierr",
|
||||
"@org_uber_go_zap//:zap",
|
||||
"@org_uber_go_zap//zapcore",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@ -57,7 +57,6 @@ import (
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
@ -1344,12 +1343,7 @@ func restoreStream(
|
||||
return errors.Annotate(err, "failed to restore meta files")
|
||||
}
|
||||
|
||||
// perform restore kv files
|
||||
rewriteRules, err := initRewriteRules(client, fullBackupTables)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
updateRewriteRules(rewriteRules, schemasReplace)
|
||||
rewriteRules := initRewriteRules(schemasReplace)
|
||||
|
||||
ingestRecorder := schemasReplace.GetIngestRecorder()
|
||||
if err := client.RangeFilterFromIngestRecorder(ingestRecorder, rewriteRules); err != nil {
|
||||
@ -1702,48 +1696,8 @@ func initFullBackupTables(
|
||||
return tables, nil
|
||||
}
|
||||
|
||||
func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) (map[int64]*restore.RewriteRules, error) {
|
||||
// compare table exists in cluster and map[table]table.Info to get rewrite rules.
|
||||
func initRewriteRules(schemasReplace *stream.SchemasReplace) map[int64]*restore.RewriteRules {
|
||||
rules := make(map[int64]*restore.RewriteRules)
|
||||
for _, t := range tables {
|
||||
if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok {
|
||||
// skip system table for now
|
||||
continue
|
||||
}
|
||||
if t.Info == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name)
|
||||
if err != nil {
|
||||
// If table not existed, skip it directly.
|
||||
continue
|
||||
}
|
||||
// we don't handle index rule in pitr. since we only support pitr on non-exists table.
|
||||
tableRules := restore.GetRewriteRulesMap(newTableInfo, t.Info, 0, false)
|
||||
for tableID, tableRule := range tableRules {
|
||||
rules[tableID] = tableRule
|
||||
}
|
||||
|
||||
log.Info("Using rewrite rule for table.", zap.Stringer("table", t.Info.Name),
|
||||
zap.Stringer("database", t.DB.Name),
|
||||
zap.Int("old-id", int(t.Info.ID)),
|
||||
zap.Array("rewrite-rules", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
|
||||
for _, r := range tableRules {
|
||||
for _, rule := range r.Data {
|
||||
if err := ae.AppendObject(logutil.RewriteRuleObject(rule)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})),
|
||||
)
|
||||
}
|
||||
return rules, nil
|
||||
}
|
||||
|
||||
func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *stream.SchemasReplace) {
|
||||
filter := schemasReplace.TableFilter
|
||||
|
||||
for _, dbReplace := range schemasReplace.DbMap {
|
||||
@ -1774,6 +1728,7 @@ func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *s
|
||||
}
|
||||
}
|
||||
}
|
||||
return rules
|
||||
}
|
||||
|
||||
func newRawBatchClient(
|
||||
|
||||
Reference in New Issue
Block a user