Files
tidb/br/pkg/backup/schema.go

306 lines
8.9 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package backup
import (
"context"
"encoding/json"
"time"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/checksum"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// DefaultSchemaConcurrency is the default number of the concurrent
// backup schema tasks.
DefaultSchemaConcurrency = 64
)
type schemaInfo struct {
tableInfo *model.TableInfo
dbInfo *model.DBInfo
crc64xor uint64
totalKvs uint64
totalBytes uint64
stats *util.JSONTable
statsIndex []*backuppb.StatsFileIndex
}
type iterFuncTp func(kv.Storage, func(*model.DBInfo, *model.TableInfo)) error
// Schemas is task for backuping schemas.
type Schemas struct {
iterFunc iterFuncTp
size int
// checkpoint: table id -> checksum
checkpointChecksum map[int64]*checkpoint.ChecksumItem
}
func NewBackupSchemas(iterFunc iterFuncTp, size int) *Schemas {
return &Schemas{
iterFunc: iterFunc,
size: size,
checkpointChecksum: nil,
}
}
func (ss *Schemas) SetCheckpointChecksum(checkpointChecksum map[int64]*checkpoint.ChecksumItem) {
ss.checkpointChecksum = checkpointChecksum
}
// BackupSchemas backups table info, including checksum and stats.
func (ss *Schemas) BackupSchemas(
ctx context.Context,
metaWriter *metautil.MetaWriter,
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType],
store kv.Storage,
statsHandle *handle.Handle,
backupTS uint64,
checksumMap map[int64]*metautil.ChecksumStats,
concurrency uint,
copConcurrency uint,
skipChecksum bool,
updateCh glue.Progress,
) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Schemas.BackupSchemas", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
workerPool := tidbutil.NewWorkerPool(concurrency, "Schemas")
errg, ectx := errgroup.WithContext(ctx)
startAll := time.Now()
op := metautil.AppendSchema
metaWriter.StartWriteMetasAsync(ctx, op)
err := ss.iterFunc(store, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
// because the field of `dbInfo` would be modified, which affects the later iteration.
// so copy the `dbInfo` for each to `newDBInfo`
newDBInfo := *dbInfo
schema := &schemaInfo{
tableInfo: tableInfo,
dbInfo: &newDBInfo,
}
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
var checksum *checkpoint.ChecksumItem
var exists = false
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
}
workerPool.ApplyOnErrorGroup(errg, func() error {
if schema.tableInfo != nil {
logger := log.L().With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)
if !skipChecksum {
logger.Info("Calculate table checksum start")
if exists && checksum != nil {
schema.crc64xor = checksum.Crc64xor
schema.totalKvs = checksum.TotalKvs
schema.totalBytes = checksum.TotalBytes
logger.Info("Calculate table checksum completed (from checkpoint)",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes))
} else {
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
calculateCost := time.Since(start)
if checkpointRunner != nil {
// if checkpoint runner is running and the checksum is not from checkpoint
// then flush the checksum by the checkpoint runner
if err = checkpointRunner.FlushChecksum(ctx, schema.tableInfo.ID, schema.crc64xor, schema.totalKvs, schema.totalBytes); err != nil {
return errors.Trace(err)
}
}
logger.Info("Calculate table checksum completed",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("TimeTaken", calculateCost))
}
if checksumMap != nil {
if err := schema.matchChecksum(checksumMap); err != nil {
return errors.Trace(err)
}
}
}
if statsHandle != nil {
statsWriter := metaWriter.NewStatsWriter()
if err := schema.dumpStatsToJSON(ctx, statsWriter, statsHandle, backupTS); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
return errors.Trace(err)
}
}
}
// Send schema to metawriter
s, err := schema.encodeToSchema()
if err != nil {
return errors.Trace(err)
}
if err := metaWriter.Send(s, op); err != nil {
return errors.Trace(err)
}
if updateCh != nil {
updateCh.Inc()
}
return nil
})
})
if err != nil {
return errors.Trace(err)
}
if err := errg.Wait(); err != nil {
return errors.Trace(err)
}
log.Info("Backup calculated table checksum into metas", zap.Duration("take", time.Since(startAll)))
summary.CollectDuration("backup checksum", time.Since(startAll))
return metaWriter.FinishWriteMetas(ctx, op)
}
// Len returns the number of schemas.
func (ss *Schemas) Len() int {
return ss.size
}
func (s *schemaInfo) calculateChecksum(
ctx context.Context,
client kv.Client,
backupTS uint64,
concurrency uint,
) error {
exe, err := checksum.NewExecutorBuilder(s.tableInfo, backupTS).
SetExplicitRequestSourceType(kvutil.ExplicitTypeBR).
SetConcurrency(concurrency).
Build()
if err != nil {
return errors.Trace(err)
}
checksumResp, err := exe.Execute(ctx, client, func() {
// TODO: update progress here.
})
if err != nil {
return errors.Trace(err)
}
s.crc64xor = checksumResp.Checksum
s.totalKvs = checksumResp.TotalKvs
s.totalBytes = checksumResp.TotalBytes
return nil
}
// Check if checksum from files matches checksum from coprocessor.
func (s *schemaInfo) matchChecksum(checksumMap map[int64]*metautil.ChecksumStats) error {
var crc, kvs, bytes uint64
ckm := checksumMap[s.tableInfo.ID]
if ckm != nil {
crc = ckm.Crc64Xor
kvs = ckm.TotalKvs
bytes = ckm.TotalBytes
}
if s.tableInfo.Partition != nil {
for _, def := range s.tableInfo.Partition.Definitions {
ckm := checksumMap[def.ID]
if ckm != nil {
crc ^= ckm.Crc64Xor
kvs += ckm.TotalKvs
bytes += ckm.TotalBytes
}
}
}
if s.crc64xor != crc || s.totalKvs != kvs || s.totalBytes != bytes {
log.Error("checksum mismatch",
zap.Stringer("db", s.dbInfo.Name),
zap.Stringer("table", s.tableInfo.Name),
zap.Uint64("origin tidb crc64", s.crc64xor),
zap.Uint64("calculated crc64", crc),
zap.Uint64("origin tidb total kvs", s.totalKvs),
zap.Uint64("calculated total kvs", kvs),
zap.Uint64("origin tidb total bytes", s.totalBytes),
zap.Uint64("calculated total bytes", bytes))
return errors.Trace(berrors.ErrBackupChecksumMismatch)
}
log.Info("checksum success",
zap.Stringer("db", s.dbInfo.Name), zap.Stringer("table", s.tableInfo.Name))
return nil
}
func (s *schemaInfo) dumpStatsToJSON(ctx context.Context, statsWriter *metautil.StatsWriter, statsHandle *handle.Handle, backupTS uint64) error {
log.Info("dump stats to json", zap.Stringer("db", s.dbInfo.Name), zap.Stringer("table", s.tableInfo.Name))
if err := statsHandle.PersistStatsBySnapshot(
ctx, s.dbInfo.Name.String(), s.tableInfo, backupTS, statsWriter.BackupStats,
); err != nil {
return errors.Trace(err)
}
statsFileIndexes, err := statsWriter.BackupStatsDone(ctx)
if err != nil {
return errors.Trace(err)
}
s.statsIndex = statsFileIndexes
return nil
}
func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
dbBytes, err := json.Marshal(s.dbInfo)
if err != nil {
return nil, errors.Trace(err)
}
var tableBytes []byte
if s.tableInfo != nil {
tableBytes, err = json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
}
var statsBytes []byte
if s.stats != nil {
statsBytes, err = json.Marshal(s.stats)
if err != nil {
return nil, errors.Trace(err)
}
}
return &backuppb.Schema{
Db: dbBytes,
Table: tableBytes,
Crc64Xor: s.crc64xor,
TotalKvs: s.totalKvs,
TotalBytes: s.totalBytes,
Stats: statsBytes,
StatsIndex: s.statsIndex,
}, nil
}