322 lines
10 KiB
Go
322 lines
10 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package backup_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/encryptionpb"
|
|
"github.com/pingcap/tidb/br/pkg/backup"
|
|
"github.com/pingcap/tidb/br/pkg/metautil"
|
|
"github.com/pingcap/tidb/br/pkg/mock"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
filter "github.com/pingcap/tidb/pkg/util/table-filter"
|
|
"github.com/stretchr/testify/require"
|
|
"go.opencensus.io/stats/view"
|
|
)
|
|
|
|
func createMockCluster(t *testing.T) *mock.Cluster {
|
|
var err error
|
|
m, err := mock.NewCluster()
|
|
require.NoError(t, err)
|
|
require.NoError(t, m.Start())
|
|
t.Cleanup(func() {
|
|
view.Stop()
|
|
m.Stop()
|
|
})
|
|
return m
|
|
}
|
|
|
|
func GetRandomStorage(t *testing.T) storeapi.Storage {
|
|
base := t.TempDir()
|
|
es, err := objstore.NewLocalStorage(base)
|
|
require.NoError(t, err)
|
|
return es
|
|
}
|
|
|
|
func GetSchemasFromMeta(t *testing.T, es storeapi.Storage) []*metautil.Table {
|
|
ctx := context.Background()
|
|
metaBytes, err := es.ReadFile(ctx, metautil.MetaFile)
|
|
require.NoError(t, err)
|
|
mockMeta := &backuppb.BackupMeta{}
|
|
err = proto.Unmarshal(metaBytes, mockMeta)
|
|
require.NoError(t, err)
|
|
metaReader := metautil.NewMetaReader(mockMeta,
|
|
es,
|
|
&backuppb.CipherInfo{
|
|
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
|
|
},
|
|
)
|
|
|
|
output := make(chan *metautil.Table, 4)
|
|
go func() {
|
|
err = metaReader.ReadSchemasFiles(ctx, output)
|
|
require.NoError(t, err)
|
|
close(output)
|
|
}()
|
|
|
|
schemas := make([]*metautil.Table, 0, 4)
|
|
for s := range output {
|
|
schemas = append(schemas, s)
|
|
}
|
|
return schemas
|
|
}
|
|
|
|
type simpleProgress struct {
|
|
counter int64
|
|
}
|
|
|
|
func (sp *simpleProgress) Inc() {
|
|
atomic.AddInt64(&sp.counter, 1)
|
|
}
|
|
|
|
// IncBy implements glue.Progress
|
|
func (sp *simpleProgress) IncBy(cnt int64) {
|
|
atomic.AddInt64(&sp.counter, cnt)
|
|
}
|
|
|
|
func (sp *simpleProgress) GetCurrent() int64 {
|
|
return 0
|
|
}
|
|
|
|
func (sp *simpleProgress) Close() {}
|
|
|
|
func (sp *simpleProgress) reset() {
|
|
atomic.StoreInt64(&sp.counter, 0)
|
|
}
|
|
|
|
func (sp *simpleProgress) get() int64 {
|
|
return atomic.LoadInt64(&sp.counter)
|
|
}
|
|
|
|
func TestBuildBackupRangeAndSchema(t *testing.T) {
|
|
m := createMockCluster(t)
|
|
|
|
tk := testkit.NewTestKit(t, m.Storage)
|
|
|
|
// Table t1 is not exist.
|
|
testFilter, err := filter.Parse([]string{"test.t1"})
|
|
require.NoError(t, err)
|
|
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(
|
|
m.Storage, testFilter, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, backupSchemas)
|
|
|
|
// Database is not exist.
|
|
fooFilter, err := filter.Parse([]string{"foo.t1"})
|
|
require.NoError(t, err)
|
|
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
|
|
m.Storage, fooFilter, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.Nil(t, backupSchemas)
|
|
|
|
// Empty database.
|
|
// Filter out system tables manually.
|
|
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*", "!sys.*"})
|
|
require.NoError(t, err)
|
|
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
|
|
m.Storage, noFilter, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, backupSchemas)
|
|
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t1;")
|
|
tk.MustExec("create table t1 (a int);")
|
|
tk.MustExec("insert into t1 values (10);")
|
|
tk.MustExec("create placement policy fivereplicas followers=4;")
|
|
|
|
var policies []*backuppb.PlacementPolicy
|
|
_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
|
|
m.Storage, testFilter, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, backupSchemas.Len())
|
|
// we expect no policies collected, because it's not full backup.
|
|
require.Equal(t, 0, len(policies))
|
|
updateCh := new(simpleProgress)
|
|
skipChecksum := false
|
|
es := GetRandomStorage(t)
|
|
cipher := backuppb.CipherInfo{
|
|
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
|
|
}
|
|
metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, "", &cipher)
|
|
ctx := context.Background()
|
|
err = backupSchemas.BackupSchemas(
|
|
ctx, metaWriter, nil, m.Storage, nil, math.MaxUint64, nil, 1, vardef.DefChecksumTableConcurrency, skipChecksum, updateCh)
|
|
require.Equal(t, int64(1), updateCh.get())
|
|
require.NoError(t, err)
|
|
err = metaWriter.FlushBackupMeta(ctx)
|
|
require.NoError(t, err)
|
|
|
|
schemas := GetSchemasFromMeta(t, es)
|
|
require.Len(t, schemas, 1)
|
|
// Cluster returns a dummy checksum (all fields are 1).
|
|
require.NotZerof(t, schemas[0].Crc64Xor, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalKvs, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalBytes, "%v", schemas[0])
|
|
|
|
tk.MustExec("drop table if exists t2;")
|
|
tk.MustExec("create table t2 (a int);")
|
|
tk.MustExec("insert into t2 values (10);")
|
|
tk.MustExec("insert into t2 values (11);")
|
|
|
|
_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
|
|
m.Storage, noFilter, math.MaxUint64, true)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, backupSchemas.Len())
|
|
// we expect the policy fivereplicas collected in full backup.
|
|
require.Equal(t, 1, len(policies))
|
|
updateCh.reset()
|
|
|
|
es2 := GetRandomStorage(t)
|
|
metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher)
|
|
err = backupSchemas.BackupSchemas(
|
|
ctx, metaWriter2, nil, m.Storage, nil, math.MaxUint64, nil, 2, vardef.DefChecksumTableConcurrency, skipChecksum, updateCh)
|
|
require.Equal(t, int64(2), updateCh.get())
|
|
require.NoError(t, err)
|
|
err = metaWriter2.FlushBackupMeta(ctx)
|
|
require.NoError(t, err)
|
|
|
|
schemas = GetSchemasFromMeta(t, es2)
|
|
|
|
require.Len(t, schemas, 2)
|
|
// Cluster returns a dummy checksum (all fields are 1).
|
|
require.NotZerof(t, schemas[0].Crc64Xor, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalKvs, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalBytes, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[1].Crc64Xor, "%v", schemas[1])
|
|
require.NotZerof(t, schemas[1].TotalKvs, "%v", schemas[1])
|
|
require.NotZerof(t, schemas[1].TotalBytes, "%v", schemas[1])
|
|
}
|
|
|
|
func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
|
|
m := createMockCluster(t)
|
|
|
|
tk := testkit.NewTestKit(t, m.Storage)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t3;")
|
|
tk.MustExec("create table t3 (a char(1));")
|
|
tk.MustExec("insert into t3 values ('1');")
|
|
tk.MustExec("analyze table t3 all columns;")
|
|
// corrupt the statistics like pingcap/br#679.
|
|
tk.MustExec(`
|
|
update mysql.stats_buckets set upper_bound = 0xffffffff
|
|
where table_id = (
|
|
select tidb_table_id from information_schema.tables
|
|
where (table_schema, table_name) = ('test', 't3')
|
|
);
|
|
`)
|
|
|
|
f, err := filter.Parse([]string{"test.t3"})
|
|
require.NoError(t, err)
|
|
|
|
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, backupSchemas.Len())
|
|
|
|
skipChecksum := false
|
|
updateCh := new(simpleProgress)
|
|
|
|
cipher := backuppb.CipherInfo{
|
|
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
|
|
}
|
|
|
|
es := GetRandomStorage(t)
|
|
metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, "", &cipher)
|
|
ctx := context.Background()
|
|
err = backupSchemas.BackupSchemas(
|
|
ctx, metaWriter, nil, m.Storage, nil, math.MaxUint64, nil, 1, vardef.DefChecksumTableConcurrency, skipChecksum, updateCh)
|
|
require.NoError(t, err)
|
|
err = metaWriter.FlushBackupMeta(ctx)
|
|
require.NoError(t, err)
|
|
|
|
schemas := GetSchemasFromMeta(t, es)
|
|
require.NoError(t, err)
|
|
require.Len(t, schemas, 1)
|
|
// the stats should be empty, but other than that everything should be backed up.
|
|
require.Nil(t, schemas[0].StatsFileIndexes)
|
|
require.NotZerof(t, schemas[0].Crc64Xor, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalKvs, "%v", schemas[0])
|
|
require.NotZerof(t, schemas[0].TotalBytes, "%v", schemas[0])
|
|
require.NotNil(t, schemas[0].Info)
|
|
require.NotNil(t, schemas[0].DB)
|
|
|
|
// recover the statistics.
|
|
tk.MustExec("analyze table t3 all columns;")
|
|
|
|
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, backupSchemas.Len())
|
|
|
|
updateCh.reset()
|
|
statsHandle := m.Domain.StatsHandle()
|
|
es2 := GetRandomStorage(t)
|
|
metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher)
|
|
err = backupSchemas.BackupSchemas(
|
|
ctx, metaWriter2, nil, m.Storage, statsHandle, math.MaxUint64, nil, 1, vardef.DefChecksumTableConcurrency, skipChecksum, updateCh)
|
|
require.NoError(t, err)
|
|
err = metaWriter2.FlushBackupMeta(ctx)
|
|
require.NoError(t, err)
|
|
|
|
schemas2 := GetSchemasFromMeta(t, es2)
|
|
require.Len(t, schemas2, 1)
|
|
// the stats should now be filled, and other than that the result should be equivalent to the first backup.
|
|
require.True(t, len(schemas2[0].StatsFileIndexes[0].InlineData) > 0 || len(schemas2[0].StatsFileIndexes[0].Name) > 0)
|
|
require.Equal(t, schemas[0].Crc64Xor, schemas2[0].Crc64Xor)
|
|
require.Equal(t, schemas[0].TotalKvs, schemas2[0].TotalKvs)
|
|
require.Equal(t, schemas[0].TotalBytes, schemas2[0].TotalBytes)
|
|
require.Equal(t, schemas[0].Info, schemas2[0].Info)
|
|
require.Equal(t, schemas[0].DB, schemas2[0].DB)
|
|
}
|
|
|
|
func TestBackupSchemasForSystemTable(t *testing.T) {
|
|
m := createMockCluster(t)
|
|
|
|
tk := testkit.NewTestKit(t, m.Storage)
|
|
es2 := GetRandomStorage(t)
|
|
|
|
systemTablesCount := 32
|
|
tablePrefix := "systable"
|
|
tk.MustExec("use mysql")
|
|
for i := 1; i <= systemTablesCount; i++ {
|
|
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
|
|
tk.MustExec(query)
|
|
}
|
|
|
|
f, err := filter.Parse([]string{"mysql.systable*"})
|
|
require.NoError(t, err)
|
|
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false)
|
|
require.NoError(t, err)
|
|
require.Equal(t, systemTablesCount, backupSchemas.Len())
|
|
|
|
ctx := context.Background()
|
|
cipher := backuppb.CipherInfo{
|
|
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
|
|
}
|
|
updateCh := new(simpleProgress)
|
|
|
|
metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher)
|
|
err = backupSchemas.BackupSchemas(ctx, metaWriter2, nil, m.Storage, nil,
|
|
math.MaxUint64, nil, 1, vardef.DefChecksumTableConcurrency, true, updateCh)
|
|
require.NoError(t, err)
|
|
err = metaWriter2.FlushBackupMeta(ctx)
|
|
require.NoError(t, err)
|
|
|
|
schemas2 := GetSchemasFromMeta(t, es2)
|
|
require.Len(t, schemas2, systemTablesCount)
|
|
for _, schema := range schemas2 {
|
|
require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name)
|
|
require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix))
|
|
}
|
|
}
|