Files
tidb/br/pkg/task/restore_test.go

1025 lines
29 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package task_test
import (
"context"
"encoding/json"
"math"
"strconv"
"testing"
"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/gluetidb"
gluemock "github.com/pingcap/tidb/br/pkg/gluetidb/mock"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pdhttp "github.com/tikv/pd/client/http"
)
const pb uint64 = units.PB
func TestPreCheckTableTiFlashReplicas(t *testing.T) {
mockStores := []*metapb.Store{
{
Id: 1,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
}
pdClient := split.NewFakePDClient(mockStores, false, nil)
tables := make([]*metautil.Table, 4)
for i := range tables {
tiflashReplica := &model.TiFlashReplicaInfo{
Count: uint64(i),
}
if i == 0 {
tiflashReplica = nil
}
tables[i] = &metautil.Table{
DB: &model.DBInfo{Name: ast.NewCIStr("test")},
Info: &model.TableInfo{
ID: int64(i),
Name: ast.NewCIStr("test" + strconv.Itoa(i)),
TiFlashReplica: tiflashReplica,
},
}
}
ctx := context.Background()
require.Nil(t, task.PreCheckTableTiFlashReplica(ctx, pdClient, tables, nil, false))
for i := range tables {
if i == 0 || i > 2 {
require.Nil(t, tables[i].Info.TiFlashReplica)
} else {
require.NotNil(t, tables[i].Info.TiFlashReplica)
obtainCount := int(tables[i].Info.TiFlashReplica.Count)
require.Equal(t, i, obtainCount)
}
}
require.Nil(t, task.PreCheckTableTiFlashReplica(ctx, pdClient, tables, tiflashrec.New(), false))
for i := range tables {
require.Nil(t, tables[i].Info.TiFlashReplica)
}
}
func TestPreCheckTableClusterIndex(t *testing.T) {
ctx := context.Background()
m, err := mock.NewCluster()
if err != nil {
panic(err)
}
err = m.Start()
if err != nil {
panic(err)
}
defer m.Stop()
g := gluetidb.New()
se, err := g.CreateSession(m.Storage)
require.NoError(t, err)
info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64)
require.NoError(t, err)
dbSchema, isExist := info.SchemaByName(ast.NewCIStr("test"))
require.True(t, isExist)
tables := make([]*metautil.Table, 4)
intField := types.NewFieldType(mysql.TypeLong)
intField.SetCharset("binary")
for i := len(tables) - 1; i >= 0; i-- {
tables[i] = &metautil.Table{
DB: dbSchema,
Info: &model.TableInfo{
ID: int64(i),
Name: ast.NewCIStr("test" + strconv.Itoa(i)),
Columns: []*model.ColumnInfo{{
ID: 1,
Name: ast.NewCIStr("id"),
FieldType: *intField,
State: model.StatePublic,
}},
Charset: "utf8mb4",
Collate: "utf8mb4_bin",
},
}
err = se.CreateTable(ctx, tables[i].DB.Name, tables[i].Info, ddl.WithOnExist(ddl.OnExistIgnore))
require.NoError(t, err)
}
// exist different tables
tables[1].Info.IsCommonHandle = true
err = task.PreCheckTableClusterIndex(tables, nil, m.Domain)
require.Error(t, err)
require.Regexp(t, `.*@@tidb_enable_clustered_index should be ON \(backup table = true, created table = false\).*`, err.Error())
// exist different DDLs
jobs := []*model.Job{{
ID: 5,
Type: model.ActionCreateTable,
SchemaName: "test",
Query: "",
BinlogInfo: &model.HistoryInfo{
TableInfo: &model.TableInfo{
Name: ast.NewCIStr("test1"),
IsCommonHandle: true,
},
},
}}
err = task.PreCheckTableClusterIndex(nil, jobs, m.Domain)
require.Error(t, err)
require.Regexp(t, `.*@@tidb_enable_clustered_index should be ON \(backup table = true, created table = false\).*`, err.Error())
// should pass pre-check cluster index
tables[1].Info.IsCommonHandle = false
jobs[0].BinlogInfo.TableInfo.IsCommonHandle = false
require.Nil(t, task.PreCheckTableClusterIndex(tables, jobs, m.Domain))
}
func TestCheckNewCollationEnable(t *testing.T) {
caseList := []struct {
backupMeta *backuppb.BackupMeta
newCollationEnableInCluster string
CheckRequirements bool
isErr bool
}{
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"},
newCollationEnableInCluster: "True",
CheckRequirements: true,
isErr: false,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"},
newCollationEnableInCluster: "False",
CheckRequirements: true,
isErr: true,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"},
newCollationEnableInCluster: "True",
CheckRequirements: true,
isErr: true,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"},
newCollationEnableInCluster: "false",
CheckRequirements: true,
isErr: false,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"},
newCollationEnableInCluster: "True",
CheckRequirements: false,
isErr: true,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"},
newCollationEnableInCluster: "False",
CheckRequirements: false,
isErr: true,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""},
newCollationEnableInCluster: "True",
CheckRequirements: false,
isErr: false,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""},
newCollationEnableInCluster: "True",
CheckRequirements: true,
isErr: true,
},
{
backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""},
newCollationEnableInCluster: "False",
CheckRequirements: false,
isErr: false,
},
}
for i, ca := range caseList {
g := &gluemock.MockGlue{
GlobalVars: map[string]string{"new_collation_enabled": ca.newCollationEnableInCluster},
}
enabled, err := task.CheckNewCollationEnable(ca.backupMeta.GetNewCollationsEnabled(), g, nil, ca.CheckRequirements)
t.Logf("[%d] Got Error: %v\n", i, err)
if ca.isErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}
func TestFilterDDLJobs(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
tk := testkit.NewTestKit(t, s.Mock.Storage)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS, err := s.Mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.Mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get ts: %s", err)
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
metaWriter := metautil.NewMetaWriter(s.Storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.MockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.MockGlue, s.Mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoErrorf(t, err, "Finially flush backupmeta failed", err)
infoSchema, err := s.Mock.Domain.GetSnapshotInfoSchema(ts)
require.NoErrorf(t, err, "Error get snapshot info schema: %s", err)
dbInfo, ok := infoSchema.SchemaByName(ast.NewCIStr("test_db"))
require.Truef(t, ok, "DB info not exist")
tableInfo, err := infoSchema.TableByName(context.Background(), ast.NewCIStr("test_db"), ast.NewCIStr("test_table"))
require.NoErrorf(t, err, "Error get table info: %s", err)
tables := []*metautil.Table{{
DB: dbInfo,
Info: tableInfo.Meta(),
}}
metaBytes, err := s.Storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
require.Equal(t, int32(metautil.MetaV1), mockMeta.Version)
metaReader := metautil.NewMetaReader(mockMeta, s.Storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
ddlJobs := task.FilterDDLJobs(allDDLJobs, tables)
for _, job := range ddlJobs {
t.Logf("get ddl job: %s", job.Query)
}
require.Equal(t, 7, len(ddlJobs))
}
func TestFilterDDLJobsV2(t *testing.T) {
s := utiltest.CreateRestoreSchemaSuite(t)
tk := testkit.NewTestKit(t, s.Mock.Storage)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS, err := s.Mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.Mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get ts: %s", err)
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
metaWriter := metautil.NewMetaWriter(s.Storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.MockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.MockGlue, s.Mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoErrorf(t, err, "Flush BackupMeta failed", err)
infoSchema, err := s.Mock.Domain.GetSnapshotInfoSchema(ts)
require.NoErrorf(t, err, "Error get snapshot info schema: %s", err)
dbInfo, ok := infoSchema.SchemaByName(ast.NewCIStr("test_db"))
require.Truef(t, ok, "DB info not exist")
tableInfo, err := infoSchema.TableByName(context.Background(), ast.NewCIStr("test_db"), ast.NewCIStr("test_table"))
require.NoErrorf(t, err, "Error get table info: %s", err)
tables := []*metautil.Table{{
DB: dbInfo,
Info: tableInfo.Meta(),
}}
metaBytes, err := s.Storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
require.Equal(t, int32(metautil.MetaV2), mockMeta.Version)
metaReader := metautil.NewMetaReader(mockMeta, s.Storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
ddlJobs := task.FilterDDLJobs(allDDLJobs, tables)
for _, job := range ddlJobs {
t.Logf("get ddl job: %s", job.Query)
}
require.Equal(t, 7, len(ddlJobs))
}
func TestFilterDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}
expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}
ddlJobs = task.FilterDDLJobByRules(ddlJobs, task.DDLJobBlockListRule)
require.Equal(t, len(expectedDDLTypes), len(ddlJobs))
for i, ddlJob := range ddlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
}
}
func TestCheckDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
{
Type: model.ActionReorganizePartition,
},
}
filteredDDlJobs := task.FilterDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule)
expectedDDLTypes := []model.ActionType{
model.ActionSetTiFlashReplica,
model.ActionAddPrimaryKey,
model.ActionUpdateTiFlashReplicaStatus,
model.ActionCreateTable,
model.ActionLockTable,
model.ActionUnlockTable,
model.ActionCreateSchema,
}
require.Equal(t, len(expectedDDLTypes), len(filteredDDlJobs))
expectedDDLJobs := make([]*model.Job, 0, len(expectedDDLTypes))
for i, ddlJob := range filteredDDlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
expectedDDLJobs = append(expectedDDLJobs, ddlJob)
}
require.NoError(t, task.CheckDDLJobByRules(expectedDDLJobs, task.DDLJobLogIncrementalCompactBlockListRule))
require.Error(t, task.CheckDDLJobByRules(ddlJobs, task.DDLJobLogIncrementalCompactBlockListRule))
}
// NOTICE: Once there is a new backfilled type ddl, BR needs to ensure that it is correctly cover by the rules:
func TestMonitorTheIncrementalUnsupportDDLType(t *testing.T) {
require.Equal(t, int(5), ddl.BackupFillerTypeCount())
}
func TestTikvUsage(t *testing.T) {
files := []*backuppb.File{
{Name: "F1", Size_: 1 * pb},
{Name: "F2", Size_: 2 * pb},
{Name: "F3", Size_: 3 * pb},
{Name: "F4", Size_: 4 * pb},
{Name: "F5", Size_: 5 * pb},
}
replica := uint64(3)
storeCnt := uint64(6)
total := uint64(0)
for _, f := range files {
total += f.GetSize_()
}
ret := task.EstimateTikvUsage(total, replica, storeCnt)
require.Equal(t, 15*pb*3/6, ret)
}
func TestTiflashUsage(t *testing.T) {
tables := []*metautil.Table{
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 0}},
FilesOfPhysicals: map[int64][]*backuppb.File{1: {{Size_: 1 * pb}}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 1}},
FilesOfPhysicals: map[int64][]*backuppb.File{2: {{Size_: 2 * pb}}}},
{Info: &model.TableInfo{TiFlashReplica: &model.TiFlashReplicaInfo{Count: 2}},
FilesOfPhysicals: map[int64][]*backuppb.File{3: {{Size_: 3 * pb}}}},
}
var storeCnt uint64 = 3
ret := task.EstimateTiflashUsage(tables, storeCnt)
require.Equal(t, 8*pb/3, ret)
}
func TestCheckTikvSpace(t *testing.T) {
store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}}
require.NoError(t, task.CheckStoreSpace(400*pb, &store))
}
func generateDBMap(snapshotDBMap map[int64]*metautil.Database) map[int64]*metautil.Database {
m := make(map[int64]*metautil.Database)
for id, dbinfo := range snapshotDBMap {
clonedTables := make([]*metautil.Table, 0, len(dbinfo.Tables))
dbInfo := dbinfo.Info.Clone()
for _, tableInfo := range dbinfo.Tables {
clonedTables = append(clonedTables, &metautil.Table{
DB: dbInfo,
Info: tableInfo.Info.Clone(),
})
}
m[id] = &metautil.Database{
Info: dbInfo,
Tables: clonedTables,
}
}
return m
}
func generateTableMap(snapshotDBMap map[int64]*metautil.Database) map[int64]*metautil.Table {
m := make(map[int64]*metautil.Table)
for _, dbinfo := range snapshotDBMap {
for _, tableInfo := range dbinfo.Tables {
m[tableInfo.Info.ID] = &metautil.Table{
DB: tableInfo.DB.Clone(),
Info: tableInfo.Info.Clone(),
FilesOfPhysicals: map[int64][]*backuppb.File{
tableInfo.Info.ID: {{Name: tableInfo.Info.Name.O}},
},
}
}
}
return m
}
func TestAdjustTablesToRestoreAndCreateTableTracker(t *testing.T) {
// Create a mock snapshot database map
snapshotDBMap := map[int64]*metautil.Database{
1: {
Info: &model.DBInfo{
ID: 1,
Name: ast.NewCIStr("test_db_1"),
},
Tables: []*metautil.Table{
{
DB: &model.DBInfo{
ID: 1,
Name: ast.NewCIStr("test_db_1"),
},
Info: &model.TableInfo{
ID: 11,
Name: ast.NewCIStr("test_table_11"),
},
FilesOfPhysicals: map[int64][]*backuppb.File{
11: {{Name: "test_table_11"}},
},
},
{
DB: &model.DBInfo{
ID: 1,
Name: ast.NewCIStr("test_db_1"),
},
Info: &model.TableInfo{
ID: 12,
Name: ast.NewCIStr("test_table_12"),
},
FilesOfPhysicals: map[int64][]*backuppb.File{
12: {{Name: "test_table_12"}},
},
},
},
},
2: {
Info: &model.DBInfo{
ID: 2,
Name: ast.NewCIStr("test_db_2"),
},
Tables: []*metautil.Table{
{
DB: &model.DBInfo{
ID: 2,
Name: ast.NewCIStr("test_db_2"),
},
Info: &model.TableInfo{
ID: 21,
Name: ast.NewCIStr("test_table_21"),
},
FilesOfPhysicals: map[int64][]*backuppb.File{
21: {{Name: "test_table_21"}},
},
},
},
},
}
tests := []struct {
name string
filterPattern []string
logBackupHistory []struct {
tableID int64
tableName string
dbID int64
}
dbIDToName map[int64]string
snapshotDBMap map[int64]*metautil.Database
snapshotTableMap map[int64]*metautil.Table
snapshotFileMap map[int64][]*backuppb.File
expectedTableIDs map[int64][]int64
expectedDBs []int64
expectedTables []int64
expectedFileMap map[int64][]*backuppb.File
}{
{
name: "No filter",
filterPattern: []string{"*.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
snapshotDBMap: generateDBMap(snapshotDBMap),
snapshotTableMap: generateTableMap(snapshotDBMap),
expectedTableIDs: map[int64][]int64{
1: {11, 12},
2: {21},
},
expectedDBs: []int64{1, 2},
expectedTables: []int64{11, 12, 21},
},
{
name: "Filter by database",
filterPattern: []string{"test_db_1.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
expectedTableIDs: map[int64][]int64{
1: {11, 12},
},
expectedDBs: []int64{1},
expectedTables: []int64{11, 12},
},
{
name: "Filter by table",
filterPattern: []string{"*.test_table_11"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
expectedTableIDs: map[int64][]int64{
1: {11},
},
expectedDBs: []int64{1, 2},
expectedTables: []int64{11},
},
{
name: "Table renamed during log backup",
filterPattern: []string{"*.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{11, "renamed_table", 1},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
snapshotDBMap: generateDBMap(snapshotDBMap),
snapshotTableMap: generateTableMap(snapshotDBMap),
expectedTableIDs: map[int64][]int64{
1: {11, 12},
2: {21},
},
expectedDBs: []int64{1, 2},
expectedTables: []int64{11, 12, 21}, // 13 not in full backup
},
{
name: "Table renamed to different database during log backup",
filterPattern: []string{"test_db_2.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{11, "renamed_table", 2},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
snapshotDBMap: generateDBMap(snapshotDBMap),
snapshotTableMap: generateTableMap(snapshotDBMap),
expectedTableIDs: map[int64][]int64{
2: {11, 21},
},
expectedDBs: []int64{1, 2},
expectedTables: []int64{11, 21},
},
{
name: "Table renamed out of filter during log backup",
filterPattern: []string{"test_db_1.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table_11", 1},
{11, "renamed_table", 2},
{12, "test_table_12", 1},
{21, "test_table_21", 2},
},
snapshotDBMap: generateDBMap(map[int64]*metautil.Database{1: snapshotDBMap[1]}),
snapshotTableMap: generateTableMap(map[int64]*metautil.Database{1: snapshotDBMap[1]}),
expectedTableIDs: map[int64][]int64{
1: {12},
},
expectedDBs: []int64{1},
expectedTables: []int64{12},
},
{
name: "Log backup table not in snapshot",
filterPattern: []string{"test_db_1.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table", 1},
},
// using empty snapshotDBMap for this test
expectedTableIDs: map[int64][]int64{},
expectedDBs: []int64{},
expectedTables: []int64{},
},
{
name: "DB created during log backup",
filterPattern: []string{"test_db_1.*"},
logBackupHistory: []struct {
tableID int64
tableName string
dbID int64
}{
{11, "test_table", 1},
},
dbIDToName: map[int64]string{
1: "test_db_1",
},
// using empty snapshotDBMap for this test
expectedTableIDs: map[int64][]int64{
1: {11},
},
expectedDBs: []int64{}, // not in full backup
expectedTables: []int64{}, // not in full backup
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// use local snapshotDBMap for special cases that need empty map
localSnapshotDBMap := snapshotDBMap
if tc.name == "Log backup table not in snapshot" || tc.name == "DB created during log backup" {
localSnapshotDBMap = map[int64]*metautil.Database{}
}
snapTableMap := tc.snapshotTableMap
if snapTableMap == nil {
snapTableMap = generateTableMap(localSnapshotDBMap)
}
logBackupTableHistory := stream.NewTableHistoryManager()
for _, h := range tc.logBackupHistory {
logBackupTableHistory.AddTableHistory(h.tableID, h.tableName, h.dbID, 1)
}
for dbID, dbName := range tc.dbIDToName {
logBackupTableHistory.RecordDBIdToName(dbID, dbName, 1)
}
testFilter, err := filter.Parse(tc.filterPattern)
require.NoError(t, err)
cfg := &task.RestoreConfig{
Config: task.Config{
TableFilter: testFilter,
},
}
// Create empty partition map for the test
partitionMap := make(map[int64]*stream.TableLocationInfo)
currentTableMap := make(map[int64]*metautil.Table)
currentDBMap := make(map[int64]*metautil.Database)
filterSnapshotMaps(testFilter, localSnapshotDBMap, currentDBMap, currentTableMap)
// Run the function
err = task.AdjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory,
cfg,
localSnapshotDBMap,
snapTableMap,
partitionMap,
currentTableMap,
currentDBMap,
)
require.NoError(t, err)
for dbID, tableIDs := range tc.expectedTableIDs {
for _, tableID := range tableIDs {
require.True(t, cfg.PiTRTableTracker.ContainsDBAndTableId(dbID, tableID))
}
}
require.Len(t, currentDBMap, len(tc.expectedDBs))
for _, dbID := range tc.expectedDBs {
require.NotNil(t, currentDBMap[dbID])
}
require.Len(t, currentTableMap, len(tc.expectedTables))
for _, tableID := range tc.expectedTables {
require.NotNil(t, currentTableMap[tableID])
}
})
}
}
func TestHash(t *testing.T) {
baseConfig := task.RestoreConfig{
UpstreamClusterID: 1,
Config: task.Config{
Storage: "default-storage",
ExplicitFilter: true,
FilterStr: []string{"filter1", "filter2"},
},
RestoreCommonConfig: task.RestoreCommonConfig{
WithSysTable: true,
},
}
testCases := []struct {
name string
modifyFunc func(*task.RestoreConfig)
changeTask bool
expectEqual bool
}{
{
name: "identical_configuration",
modifyFunc: func(*task.RestoreConfig) {},
expectEqual: true,
},
{
name: "change_upstream_cluster_id",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.UpstreamClusterID = 999
},
expectEqual: false,
},
{
name: "change_storage",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.Config.Storage = "new-storage"
},
expectEqual: false,
},
{
name: "toggle_explicit_filter",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.Config.ExplicitFilter = !cfg.Config.ExplicitFilter
},
expectEqual: true,
},
{
name: "modify_filter_strings",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.Config.FilterStr = append(cfg.Config.FilterStr, "new-filter")
},
expectEqual: false,
},
{
name: "reorder_filter_strings",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.Config.FilterStr = []string{cfg.Config.FilterStr[1], cfg.Config.FilterStr[0]}
},
expectEqual: false,
},
{
name: "toggle_system_tables",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.WithSysTable = !cfg.WithSysTable
},
expectEqual: false,
},
{
name: "empty_vs_base_config",
modifyFunc: func(cfg *task.RestoreConfig) {
*cfg = task.RestoreConfig{}
},
expectEqual: false,
},
{
name: "multiple_changes",
modifyFunc: func(cfg *task.RestoreConfig) {
cfg.UpstreamClusterID = 2
cfg.Config.Storage = "multi-change-storage"
},
expectEqual: false,
},
{
name: "change_task_type",
changeTask: true,
modifyFunc: func(*task.RestoreConfig) {},
expectEqual: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
originalCfg := baseConfig
modifiedCfg := baseConfig
if tc.modifyFunc != nil {
tc.modifyFunc(&modifiedCfg)
}
var cfg1, cfg2 *task.RestoreConfig
if tc.name == "nil_configuration" {
cfg1 = &originalCfg
cfg2 = nil
} else {
cfg1 = &originalCfg
cfg2 = &modifiedCfg
}
taskType1 := "full"
taskType2 := "full"
if tc.changeTask {
taskType2 = "db"
}
hash1, err1 := cfg1.Hash(taskType1)
hash2, err2 := cfg2.Hash(taskType2)
if cfg2 == nil {
require.Error(t, err2, "Expected error for nil config")
return
}
require.NoError(t, err1)
require.NoError(t, err2)
if tc.expectEqual {
require.Equal(t, hash1, hash2, "Hashes should be equal for: %s", tc.name)
} else {
require.NotEqual(t, hash1, hash2, "Hashes should differ for: %s", tc.name)
}
})
}
}
// filterSnapshotMaps filters the snapshot maps based on the filter criteria and populates the current maps
func filterSnapshotMaps(
tableFilter filter.Filter,
snapDBMap map[int64]*metautil.Database,
currentDBMap map[int64]*metautil.Database,
currentTableMap map[int64]*metautil.Table,
) {
for dbID, db := range snapDBMap {
dbName := db.Info.Name.O
if tableFilter.MatchSchema(dbName) {
currentDBMap[dbID] = db
}
for _, table := range db.Tables {
tableID := table.Info.ID
tableName := table.Info.Name.O
if tableFilter.MatchTable(dbName, tableName) {
currentTableMap[tableID] = table
}
}
}
}