295 lines
8.1 KiB
Go
295 lines
8.1 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package task
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/encryptionpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/tidb/br/pkg/conn"
|
|
"github.com/pingcap/tidb/br/pkg/metautil"
|
|
snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/statistics/util"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/stretchr/testify/require"
|
|
pd "github.com/tikv/pd/client"
|
|
"github.com/tikv/pd/client/opt"
|
|
"github.com/tikv/pd/client/pkg/caller"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
func TestRestoreConfigAdjust(t *testing.T) {
|
|
cfg := &RestoreConfig{}
|
|
cfg.Adjust()
|
|
|
|
require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency)
|
|
require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval)
|
|
require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount.Value)
|
|
require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes.Value)
|
|
}
|
|
|
|
type mockPDClient struct {
|
|
pd.Client
|
|
}
|
|
|
|
func (m mockPDClient) GetClusterID(_ context.Context) uint64 {
|
|
return 1
|
|
}
|
|
|
|
func (m mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
|
|
return []*metapb.Store{}, nil
|
|
}
|
|
|
|
func (m mockPDClient) WithCallerComponent(_ caller.Component) pd.Client {
|
|
return m
|
|
}
|
|
|
|
func TestConfigureRestoreClient(t *testing.T) {
|
|
cfg := Config{
|
|
Concurrency: 1024,
|
|
}
|
|
restoreComCfg := RestoreCommonConfig{
|
|
Online: true,
|
|
}
|
|
restoreCfg := &RestoreConfig{
|
|
Config: cfg,
|
|
RestoreCommonConfig: restoreComCfg,
|
|
DdlBatchSize: 128,
|
|
}
|
|
client := snapclient.NewRestoreClient(mockPDClient{}, nil, nil, keepalive.ClientParameters{})
|
|
ctx := context.Background()
|
|
err := configureRestoreClient(ctx, client, restoreCfg)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint(128), client.GetBatchDdlSize())
|
|
}
|
|
|
|
func TestAdjustRestoreConfigForStreamRestore(t *testing.T) {
|
|
restoreCfg := RestoreConfig{}
|
|
|
|
restoreCfg.adjustRestoreConfigForStreamRestore()
|
|
require.Equal(t, restoreCfg.PitrBatchCount, uint32(defaultPiTRBatchCount))
|
|
require.Equal(t, restoreCfg.PitrBatchSize, uint32(defaultPiTRBatchSize))
|
|
require.Equal(t, restoreCfg.PitrConcurrency, uint32(defaultPiTRConcurrency)+1)
|
|
}
|
|
|
|
func TestCheckRestoreDBAndTable(t *testing.T) {
|
|
cases := []struct {
|
|
cfgSchemas map[string]struct{}
|
|
cfgTables map[string]struct{}
|
|
backupDBs map[string]*metautil.Database
|
|
}{
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("test"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("test", "t"): {},
|
|
utils.EncloseDBAndTable("test", "t2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"test": {"T", "T2"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("mysql"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("mysql", "t"): {},
|
|
utils.EncloseDBAndTable("mysql", "t2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"__TiDB_BR_Temporary_mysql": {"T", "T2"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("test"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("test", "T"): {},
|
|
utils.EncloseDBAndTable("test", "T2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"test": {"t", "t2"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("TEST"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("TEST", "t"): {},
|
|
utils.EncloseDBAndTable("TEST", "T2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"test": {"t", "t2"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("TeSt"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("TeSt", "tabLe"): {},
|
|
utils.EncloseDBAndTable("TeSt", "taBle2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"TesT": {"TablE", "taBle2"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("TeSt"): {},
|
|
utils.EncloseName("MYSQL"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("TeSt", "tabLe"): {},
|
|
utils.EncloseDBAndTable("TeSt", "taBle2"): {},
|
|
utils.EncloseDBAndTable("MYSQL", "taBle"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"TesT": {"table", "TaBLE2"},
|
|
"__TiDB_BR_Temporary_mysql": {"tablE"},
|
|
}),
|
|
},
|
|
{
|
|
cfgSchemas: map[string]struct{}{
|
|
utils.EncloseName("sys"): {},
|
|
},
|
|
cfgTables: map[string]struct{}{
|
|
utils.EncloseDBAndTable("sys", "t"): {},
|
|
utils.EncloseDBAndTable("sys", "t2"): {},
|
|
},
|
|
backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{
|
|
"__TiDB_BR_Temporary_sys": {"T", "T2"},
|
|
}),
|
|
},
|
|
}
|
|
|
|
cfg := &RestoreConfig{}
|
|
for _, ca := range cases {
|
|
cfg.Schemas = ca.cfgSchemas
|
|
cfg.Tables = ca.cfgTables
|
|
|
|
backupDBs := make([]*metautil.Database, 0, len(ca.backupDBs))
|
|
for _, db := range ca.backupDBs {
|
|
backupDBs = append(backupDBs, db)
|
|
}
|
|
err := VerifyDBAndTableInBackup(backupDBs, cfg)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*metautil.Database {
|
|
testDir := t.TempDir()
|
|
store, err := objstore.NewLocalStorage(testDir)
|
|
require.NoError(t, err)
|
|
|
|
mockSchemas := make([]*backuppb.Schema, 0)
|
|
var dbID int64 = 1
|
|
for db, tables := range db2Tables {
|
|
dbName := ast.NewCIStr(db)
|
|
mockTblList := make([]*model.TableInfo, 0)
|
|
tblBytesList, statsBytesList := make([][]byte, 0), make([][]byte, 0)
|
|
|
|
for i, table := range tables {
|
|
tblName := ast.NewCIStr(table)
|
|
mockTbl := &model.TableInfo{
|
|
ID: dbID*100 + int64(i),
|
|
Name: tblName,
|
|
}
|
|
mockTblList = append(mockTblList, mockTbl)
|
|
|
|
mockStats := util.JSONTable{
|
|
DatabaseName: dbName.String(),
|
|
TableName: tblName.String(),
|
|
}
|
|
|
|
tblBytes, err := json.Marshal(mockTbl)
|
|
require.NoError(t, err)
|
|
tblBytesList = append(tblBytesList, tblBytes)
|
|
|
|
statsBytes, err := json.Marshal(mockStats)
|
|
require.NoError(t, err)
|
|
statsBytesList = append(statsBytesList, statsBytes)
|
|
}
|
|
|
|
mockDB := model.DBInfo{
|
|
ID: dbID,
|
|
Name: dbName,
|
|
}
|
|
mockDB.Deprecated.Tables = mockTblList
|
|
dbID++
|
|
dbBytes, err := json.Marshal(mockDB)
|
|
require.NoError(t, err)
|
|
|
|
for i := range tblBytesList {
|
|
mockSchemas = append(mockSchemas, &backuppb.Schema{
|
|
Db: dbBytes,
|
|
Table: tblBytesList[i],
|
|
Stats: statsBytesList[i],
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
mockFiles := []*backuppb.File{
|
|
{
|
|
Name: fmt.Sprintf("%p.sst", &mockSchemas),
|
|
StartKey: tablecodec.EncodeRowKey(1, []byte("a")),
|
|
EndKey: tablecodec.EncodeRowKey(2, []byte("a")),
|
|
},
|
|
}
|
|
|
|
meta := mockBackupMeta(mockSchemas, mockFiles)
|
|
data, err := proto.Marshal(meta)
|
|
require.NoError(t, err)
|
|
|
|
ctx := context.Background()
|
|
err = store.WriteFile(ctx, metautil.MetaFile, data)
|
|
require.NoError(t, err)
|
|
|
|
dbs, err := metautil.LoadBackupTables(
|
|
ctx,
|
|
metautil.NewMetaReader(
|
|
meta,
|
|
store,
|
|
&backuppb.CipherInfo{
|
|
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
|
|
}),
|
|
true,
|
|
)
|
|
require.NoError(t, err)
|
|
return dbs
|
|
}
|
|
|
|
func mockBackupMeta(mockSchemas []*backuppb.Schema, mockFiles []*backuppb.File) *backuppb.BackupMeta {
|
|
return &backuppb.BackupMeta{
|
|
Files: mockFiles,
|
|
Schemas: mockSchemas,
|
|
}
|
|
}
|