lightning: change the caching mechanism for pre-info-getter (#37591)
close pingcap/tidb#37583, close pingcap/tidb#37590
This commit is contained in:
@ -7,7 +7,6 @@ go_library(
|
||||
"check_template.go",
|
||||
"checksum.go",
|
||||
"get_pre_info.go",
|
||||
"get_pre_info_opts.go",
|
||||
"meta_manager.go",
|
||||
"precheck.go",
|
||||
"precheck_impl.go",
|
||||
@ -32,6 +31,7 @@ go_library(
|
||||
"//br/pkg/lightning/log",
|
||||
"//br/pkg/lightning/metric",
|
||||
"//br/pkg/lightning/mydump",
|
||||
"//br/pkg/lightning/restore/opts",
|
||||
"//br/pkg/lightning/tikv",
|
||||
"//br/pkg/lightning/verification",
|
||||
"//br/pkg/lightning/web",
|
||||
@ -116,6 +116,7 @@ go_test(
|
||||
"//br/pkg/lightning/metric",
|
||||
"//br/pkg/lightning/mydump",
|
||||
"//br/pkg/lightning/restore/mock",
|
||||
"//br/pkg/lightning/restore/opts",
|
||||
"//br/pkg/lightning/verification",
|
||||
"//br/pkg/lightning/web",
|
||||
"//br/pkg/lightning/worker",
|
||||
|
||||
@ -120,14 +120,6 @@ func (rc *Controller) HasLargeCSV(ctx context.Context) error {
|
||||
return rc.doPreCheckOnItem(ctx, CheckLargeDataFile)
|
||||
}
|
||||
|
||||
func (rc *Controller) estimateSourceData(ctx context.Context) (int64, int64, bool, error) {
|
||||
result, err := rc.preInfoGetter.EstimateSourceDataSize(ctx)
|
||||
if err != nil {
|
||||
return 0, 0, false, errors.Trace(err)
|
||||
}
|
||||
return result.SizeWithIndex, result.SizeWithoutIndex, result.HasUnsortedBigTables, nil
|
||||
}
|
||||
|
||||
// localResource checks the local node has enough resources for this import when local backend enabled;
|
||||
func (rc *Controller) localResource(ctx context.Context) error {
|
||||
if rc.isSourceInLocal() {
|
||||
|
||||
@ -413,7 +413,8 @@ func TestCheckCSVHeader(t *testing.T) {
|
||||
preInfoGetter,
|
||||
nil,
|
||||
)
|
||||
err = rc.checkCSVHeader(WithPreInfoGetterTableStructuresCache(ctx, rc.dbInfos))
|
||||
preInfoGetter.dbInfosCache = rc.dbInfos
|
||||
err = rc.checkCSVHeader(ctx)
|
||||
require.NoError(t, err)
|
||||
if ca.level != passed {
|
||||
require.Equal(t, 1, rc.checkTemplate.FailedCount(ca.level))
|
||||
@ -631,7 +632,8 @@ func TestLocalResource(t *testing.T) {
|
||||
}
|
||||
|
||||
estimatedSizeResult := new(EstimateSourceDataSizeResult)
|
||||
ctx := WithPreInfoGetterEstimatedSrcSizeCache(context.Background(), estimatedSizeResult)
|
||||
preInfoGetter.estimatedSizeCache = estimatedSizeResult
|
||||
ctx := context.Background()
|
||||
// 1. source-size is smaller than disk-size, won't trigger error information
|
||||
rc.checkTemplate = NewSimpleTemplate()
|
||||
estimatedSizeResult.SizeWithIndex = 1000
|
||||
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/glue"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/verification"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/worker"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
@ -68,7 +69,7 @@ type EstimateSourceDataSizeResult struct {
|
||||
type PreRestoreInfoGetter interface {
|
||||
TargetInfoGetter
|
||||
// GetAllTableStructures gets all the table structures with the information from both the source and the target.
|
||||
GetAllTableStructures(ctx context.Context) (map[string]*checkpoints.TidbDBInfo, error)
|
||||
GetAllTableStructures(ctx context.Context, opts ...ropts.GetPreInfoOption) (map[string]*checkpoints.TidbDBInfo, error)
|
||||
// ReadFirstNRowsByTableName reads the first N rows of data of an importing source table.
|
||||
ReadFirstNRowsByTableName(ctx context.Context, schemaName string, tableName string, n int) (cols []string, rows [][]types.Datum, err error)
|
||||
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
|
||||
@ -79,7 +80,7 @@ type PreRestoreInfoGetter interface {
|
||||
// which might include some extra index data to generate besides the source file data
|
||||
// * the total data size of all the source files,
|
||||
// * whether there are some unsorted big tables
|
||||
EstimateSourceDataSize(ctx context.Context) (*EstimateSourceDataSizeResult, error)
|
||||
EstimateSourceDataSize(ctx context.Context, opts ...ropts.GetPreInfoOption) (*EstimateSourceDataSizeResult, error)
|
||||
}
|
||||
|
||||
// TargetInfoGetter defines the operations to get information from target.
|
||||
@ -91,7 +92,7 @@ type TargetInfoGetter interface {
|
||||
// IsTableEmpty checks whether the specified table on the target DB contains data or not.
|
||||
IsTableEmpty(ctx context.Context, schemaName string, tableName string) (*bool, error)
|
||||
// GetTargetSysVariablesForImport gets some important systam variables for importing on the target.
|
||||
GetTargetSysVariablesForImport(ctx context.Context) map[string]string
|
||||
GetTargetSysVariablesForImport(ctx context.Context, opts ...ropts.GetPreInfoOption) map[string]string
|
||||
// GetReplicationConfig gets the replication config on the target.
|
||||
GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error)
|
||||
// GetStorageInfo gets the storage information on the target.
|
||||
@ -103,28 +104,13 @@ type TargetInfoGetter interface {
|
||||
type preInfoGetterKey string
|
||||
|
||||
const (
|
||||
preInfoGetterKeyDBMetas preInfoGetterKey = "PRE_INFO_GETTER/DB_METAS"
|
||||
preInfoGetterKeyTableStructsCache preInfoGetterKey = "PRE_INFO_GETTER/TABLE_STRUCTS_CACHE"
|
||||
preInfoGetterKeySysVarsCache preInfoGetterKey = "PRE_INFO_GETTER/SYS_VARS_CACHE"
|
||||
preInfoGetterKeyEstimatedSourceSizeCache preInfoGetterKey = "PRE_INFO_GETTER/ESTIMATED_SOURCE_SIZE_CACHE"
|
||||
preInfoGetterKeyDBMetas preInfoGetterKey = "PRE_INFO_GETTER/DB_METAS"
|
||||
)
|
||||
|
||||
func WithPreInfoGetterDBMetas(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta) context.Context {
|
||||
return context.WithValue(ctx, preInfoGetterKeyDBMetas, dbMetas)
|
||||
}
|
||||
|
||||
func WithPreInfoGetterTableStructuresCache(ctx context.Context, dbInfos map[string]*checkpoints.TidbDBInfo) context.Context {
|
||||
return context.WithValue(ctx, preInfoGetterKeyTableStructsCache, dbInfos)
|
||||
}
|
||||
|
||||
func WithPreInfoGetterSysVarsCache(ctx context.Context, sysVars map[string]string) context.Context {
|
||||
return context.WithValue(ctx, preInfoGetterKeySysVarsCache, sysVars)
|
||||
}
|
||||
|
||||
func WithPreInfoGetterEstimatedSrcSizeCache(ctx context.Context, sizeResult *EstimateSourceDataSizeResult) context.Context {
|
||||
return context.WithValue(ctx, preInfoGetterKeyEstimatedSourceSizeCache, sizeResult)
|
||||
}
|
||||
|
||||
// TargetInfoGetterImpl implements the operations to get information from the target.
|
||||
type TargetInfoGetterImpl struct {
|
||||
cfg *config.Config
|
||||
@ -228,7 +214,7 @@ func (g *TargetInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName stri
|
||||
// GetTargetSysVariablesForImport gets some important system variables for importing on the target.
|
||||
// It implements the TargetInfoGetter interface.
|
||||
// It uses the SQL to fetch sys variables from the target.
|
||||
func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Context) map[string]string {
|
||||
func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Context, _ ...ropts.GetPreInfoOption) map[string]string {
|
||||
sysVars := ObtainImportantVariables(ctx, g.targetDBGlue.GetSQLExecutor(), !isTiDBBackend(g.cfg))
|
||||
// override by manually set vars
|
||||
maps.Copy(sysVars, g.cfg.TiDB.Vars)
|
||||
@ -271,7 +257,7 @@ func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtype
|
||||
// PreRestoreInfoGetterImpl implements the operations to get information used in importing preparation.
|
||||
type PreRestoreInfoGetterImpl struct {
|
||||
cfg *config.Config
|
||||
getPreInfoCfg *GetPreInfoConfig
|
||||
getPreInfoCfg *ropts.GetPreInfoConfig
|
||||
srcStorage storage.ExternalStorage
|
||||
ioWorkers *worker.Pool
|
||||
encBuilder backend.EncodingBuilder
|
||||
@ -280,6 +266,10 @@ type PreRestoreInfoGetterImpl struct {
|
||||
dbMetas []*mydump.MDDatabaseMeta
|
||||
mdDBMetaMap map[string]*mydump.MDDatabaseMeta
|
||||
mdDBTableMetaMap map[string]map[string]*mydump.MDTableMeta
|
||||
|
||||
dbInfosCache map[string]*checkpoints.TidbDBInfo
|
||||
sysVarsCache map[string]string
|
||||
estimatedSizeCache *EstimateSourceDataSizeResult
|
||||
}
|
||||
|
||||
// NewPreRestoreInfoGetter creates a PreRestoreInfoGetterImpl object.
|
||||
@ -290,7 +280,7 @@ func NewPreRestoreInfoGetter(
|
||||
targetInfoGetter TargetInfoGetter,
|
||||
ioWorkers *worker.Pool,
|
||||
encBuilder backend.EncodingBuilder,
|
||||
opts ...GetPreInfoOption,
|
||||
opts ...ropts.GetPreInfoOption,
|
||||
) (*PreRestoreInfoGetterImpl, error) {
|
||||
if ioWorkers == nil {
|
||||
ioWorkers = worker.NewPool(context.Background(), cfg.App.IOConcurrency, "pre_info_getter_io")
|
||||
@ -306,9 +296,9 @@ func NewPreRestoreInfoGetter(
|
||||
}
|
||||
}
|
||||
|
||||
getPreInfoCfg := NewDefaultGetPreInfoConfig()
|
||||
getPreInfoCfg := ropts.NewDefaultGetPreInfoConfig()
|
||||
for _, o := range opts {
|
||||
o.Apply(getPreInfoCfg)
|
||||
o(getPreInfoCfg)
|
||||
}
|
||||
result := &PreRestoreInfoGetterImpl{
|
||||
cfg: cfg,
|
||||
@ -347,34 +337,34 @@ func (p *PreRestoreInfoGetterImpl) Init() {
|
||||
// GetAllTableStructures gets all the table structures with the information from both the source and the target.
|
||||
// It implements the PreRestoreInfoGetter interface.
|
||||
// It has a caching mechanism: the table structures will be obtained from the source only once.
|
||||
func (p *PreRestoreInfoGetterImpl) GetAllTableStructures(ctx context.Context) (map[string]*checkpoints.TidbDBInfo, error) {
|
||||
func (p *PreRestoreInfoGetterImpl) GetAllTableStructures(ctx context.Context, opts ...ropts.GetPreInfoOption) (map[string]*checkpoints.TidbDBInfo, error) {
|
||||
var (
|
||||
dbInfos map[string]*checkpoints.TidbDBInfo
|
||||
err error
|
||||
)
|
||||
dbInfosVal := ctx.Value(preInfoGetterKeyTableStructsCache)
|
||||
if dbInfosVal != nil {
|
||||
if v, ok := dbInfosVal.(map[string]*checkpoints.TidbDBInfo); ok {
|
||||
dbInfos = v
|
||||
}
|
||||
getPreInfoCfg := p.getPreInfoCfg.Clone()
|
||||
for _, o := range opts {
|
||||
o(getPreInfoCfg)
|
||||
}
|
||||
if dbInfos != nil {
|
||||
dbInfos = p.dbInfosCache
|
||||
if dbInfos != nil && !getPreInfoCfg.ForceReloadCache {
|
||||
return dbInfos, nil
|
||||
}
|
||||
dbInfos, err = LoadSchemaInfo(ctx, p.dbMetas, func(ctx context.Context, dbName string) ([]*model.TableInfo, error) {
|
||||
return p.getTableStructuresByFileMeta(ctx, p.mdDBMetaMap[dbName])
|
||||
return p.getTableStructuresByFileMeta(ctx, p.mdDBMetaMap[dbName], getPreInfoCfg)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
p.dbInfosCache = dbInfos
|
||||
return dbInfos, nil
|
||||
}
|
||||
|
||||
func (p *PreRestoreInfoGetterImpl) getTableStructuresByFileMeta(ctx context.Context, dbSrcFileMeta *mydump.MDDatabaseMeta) ([]*model.TableInfo, error) {
|
||||
func (p *PreRestoreInfoGetterImpl) getTableStructuresByFileMeta(ctx context.Context, dbSrcFileMeta *mydump.MDDatabaseMeta, getPreInfoCfg *ropts.GetPreInfoConfig) ([]*model.TableInfo, error) {
|
||||
dbName := dbSrcFileMeta.Name
|
||||
currentTableInfosFromDB, err := p.targetInfoGetter.FetchRemoteTableModels(ctx, dbName)
|
||||
if err != nil {
|
||||
if p.getPreInfoCfg != nil && p.getPreInfoCfg.IgnoreDBNotExist {
|
||||
if getPreInfoCfg != nil && getPreInfoCfg.IgnoreDBNotExist {
|
||||
dbNotExistErr := dbterror.ClassSchema.NewStd(errno.ErrBadDB).FastGenByArgs(dbName)
|
||||
// The returned error is an error showing get info request error,
|
||||
// and attaches the detailed error response as a string.
|
||||
@ -510,15 +500,17 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context,
|
||||
|
||||
// EstimateSourceDataSize estimates the datasize to generate during the import as well as some other sub-informaiton.
|
||||
// It implements the PreRestoreInfoGetter interface.
|
||||
func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context) (*EstimateSourceDataSizeResult, error) {
|
||||
// It has a cache mechanism. The estimated size will only calculated once.
|
||||
// The caching behavior can be changed by appending the `ForceReloadCache(true)` option.
|
||||
func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context, opts ...ropts.GetPreInfoOption) (*EstimateSourceDataSizeResult, error) {
|
||||
var result *EstimateSourceDataSizeResult
|
||||
resultVal := ctx.Value(preInfoGetterKeyEstimatedSourceSizeCache)
|
||||
if resultVal != nil {
|
||||
if v, ok := resultVal.(*EstimateSourceDataSizeResult); ok {
|
||||
result = v
|
||||
}
|
||||
|
||||
getPreInfoCfg := p.getPreInfoCfg.Clone()
|
||||
for _, o := range opts {
|
||||
o(getPreInfoCfg)
|
||||
}
|
||||
if result != nil {
|
||||
result = p.estimatedSizeCache
|
||||
if result != nil && !getPreInfoCfg.ForceReloadCache {
|
||||
return result, nil
|
||||
}
|
||||
sizeWithIndex := int64(0)
|
||||
@ -575,6 +567,7 @@ func (p *PreRestoreInfoGetterImpl) EstimateSourceDataSize(ctx context.Context) (
|
||||
SizeWithoutIndex: sourceTotalSize,
|
||||
HasUnsortedBigTables: (unSortedBigTableCount > 0),
|
||||
}
|
||||
p.estimatedSizeCache = result
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -777,16 +770,18 @@ func (p *PreRestoreInfoGetterImpl) CheckVersionRequirements(ctx context.Context)
|
||||
// GetTargetSysVariablesForImport gets some important systam variables for importing on the target.
|
||||
// It implements the PreRestoreInfoGetter interface.
|
||||
// It has caching mechanism.
|
||||
func (p *PreRestoreInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Context) map[string]string {
|
||||
func (p *PreRestoreInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Context, opts ...ropts.GetPreInfoOption) map[string]string {
|
||||
var sysVars map[string]string
|
||||
sysVarsVal := ctx.Value(preInfoGetterKeySysVarsCache)
|
||||
if sysVarsVal != nil {
|
||||
if v, ok := sysVarsVal.(map[string]string); ok {
|
||||
sysVars = v
|
||||
}
|
||||
|
||||
getPreInfoCfg := p.getPreInfoCfg.Clone()
|
||||
for _, o := range opts {
|
||||
o(getPreInfoCfg)
|
||||
}
|
||||
if sysVars != nil {
|
||||
sysVars = p.sysVarsCache
|
||||
if sysVars != nil && !getPreInfoCfg.ForceReloadCache {
|
||||
return sysVars
|
||||
}
|
||||
return p.targetInfoGetter.GetTargetSysVariablesForImport(ctx)
|
||||
sysVars = p.targetInfoGetter.GetTargetSysVariablesForImport(ctx)
|
||||
p.sysVarsCache = sysVars
|
||||
return sysVars
|
||||
}
|
||||
|
||||
@ -1,42 +0,0 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package restore
|
||||
|
||||
type GetPreInfoConfig struct {
|
||||
IgnoreDBNotExist bool
|
||||
}
|
||||
|
||||
func NewDefaultGetPreInfoConfig() *GetPreInfoConfig {
|
||||
return &GetPreInfoConfig{
|
||||
IgnoreDBNotExist: false,
|
||||
}
|
||||
}
|
||||
|
||||
type GetPreInfoOption interface {
|
||||
Apply(c *GetPreInfoConfig)
|
||||
}
|
||||
|
||||
type ignoreDBNotExistOption struct {
|
||||
ignoreDBNotExist bool
|
||||
}
|
||||
|
||||
func (o *ignoreDBNotExistOption) Apply(c *GetPreInfoConfig) {
|
||||
c.IgnoreDBNotExist = o.ignoreDBNotExist
|
||||
}
|
||||
|
||||
func WithIgnoreDBNotExist(ignoreDBNotExist bool) GetPreInfoOption {
|
||||
return &ignoreDBNotExistOption{
|
||||
ignoreDBNotExist: ignoreDBNotExist,
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
"github.com/pingcap/tidb/errno"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/types"
|
||||
@ -223,7 +224,7 @@ func TestGetPreInfoGetAllTableStructures(t *testing.T) {
|
||||
|
||||
cfg := config.NewConfig()
|
||||
cfg.TikvImporter.Backend = config.BackendLocal
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, WithIgnoreDBNotExist(true))
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true))
|
||||
require.NoError(t, err)
|
||||
tblStructMap, err := ig.GetAllTableStructures(ctx)
|
||||
require.Nil(t, err)
|
||||
@ -396,7 +397,7 @@ func TestGetPreInfoSampleSource(t *testing.T) {
|
||||
mockTarget := mock.NewMockTargetInfo()
|
||||
cfg := config.NewConfig()
|
||||
cfg.TikvImporter.Backend = config.BackendLocal
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, WithIgnoreDBNotExist(true))
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
mdDBMeta := mockSrc.GetAllDBFileMetas()[0]
|
||||
@ -486,7 +487,7 @@ func TestGetPreInfoEstimateSourceSize(t *testing.T) {
|
||||
mockTarget := mock.NewMockTargetInfo()
|
||||
cfg := config.NewConfig()
|
||||
cfg.TikvImporter.Backend = config.BackendLocal
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, WithIgnoreDBNotExist(true))
|
||||
ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
sizeResult, err := ig.EstimateSourceDataSize(ctx)
|
||||
|
||||
@ -7,6 +7,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//br/pkg/lightning/mydump",
|
||||
"//br/pkg/lightning/restore/opts",
|
||||
"//br/pkg/storage",
|
||||
"//errno",
|
||||
"//parser/model",
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/errno"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
@ -215,7 +216,7 @@ func (t *MockTargetInfo) FetchRemoteTableModels(ctx context.Context, schemaName
|
||||
|
||||
// GetTargetSysVariablesForImport gets some important systam variables for importing on the target.
|
||||
// It implements the TargetInfoGetter interface.
|
||||
func (t *MockTargetInfo) GetTargetSysVariablesForImport(ctx context.Context) map[string]string {
|
||||
func (t *MockTargetInfo) GetTargetSysVariablesForImport(ctx context.Context, _ ...ropts.GetPreInfoOption) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for k, v := range t.sysVarMap {
|
||||
result[k] = v
|
||||
|
||||
12
br/pkg/lightning/restore/opts/BUILD.bazel
Normal file
12
br/pkg/lightning/restore/opts/BUILD.bazel
Normal file
@ -0,0 +1,12 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "opts",
|
||||
srcs = [
|
||||
"get_pre_info_opts.go",
|
||||
"precheck_opts.go",
|
||||
],
|
||||
importpath = "github.com/pingcap/tidb/br/pkg/lightning/restore/opts",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//br/pkg/lightning/mydump"],
|
||||
)
|
||||
55
br/pkg/lightning/restore/opts/get_pre_info_opts.go
Normal file
55
br/pkg/lightning/restore/opts/get_pre_info_opts.go
Normal file
@ -0,0 +1,55 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// opts contains all kinds of options definitions that can affect the behavior of restore & get infos.
|
||||
package opts
|
||||
|
||||
// GetPreInfoConfig stores some configs to affect behavior to get pre restore infos.
|
||||
type GetPreInfoConfig struct {
|
||||
IgnoreDBNotExist bool
|
||||
ForceReloadCache bool
|
||||
}
|
||||
|
||||
// Clone clones a new independent config object from the original one.
|
||||
func (c *GetPreInfoConfig) Clone() *GetPreInfoConfig {
|
||||
clonedCfg := NewDefaultGetPreInfoConfig()
|
||||
if c != nil {
|
||||
*clonedCfg = *c
|
||||
}
|
||||
return clonedCfg
|
||||
}
|
||||
|
||||
// NewDefaultGetPreInfoConfig returns the default get-pre-info config.
|
||||
func NewDefaultGetPreInfoConfig() *GetPreInfoConfig {
|
||||
return &GetPreInfoConfig{
|
||||
IgnoreDBNotExist: false,
|
||||
ForceReloadCache: false,
|
||||
}
|
||||
}
|
||||
|
||||
// GetPreInfoOption defines the type for passing optional arguments for PreInfoGetter methods.
|
||||
type GetPreInfoOption func(c *GetPreInfoConfig)
|
||||
|
||||
// WithIgnoreDBNotExist sets whether to ignore DB not exist error when getting DB schemas.
|
||||
func WithIgnoreDBNotExist(ignoreDBNotExist bool) GetPreInfoOption {
|
||||
return func(c *GetPreInfoConfig) {
|
||||
c.IgnoreDBNotExist = ignoreDBNotExist
|
||||
}
|
||||
}
|
||||
|
||||
// ForceReloadCache sets whether to reload the cache for some caching results.
|
||||
func ForceReloadCache(forceReloadCache bool) GetPreInfoOption {
|
||||
return func(c *GetPreInfoConfig) {
|
||||
c.ForceReloadCache = forceReloadCache
|
||||
}
|
||||
}
|
||||
36
br/pkg/lightning/restore/opts/precheck_opts.go
Normal file
36
br/pkg/lightning/restore/opts/precheck_opts.go
Normal file
@ -0,0 +1,36 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// opts contains all kinds of options definitions that can affect the behavior of restore & get infos.
|
||||
package opts
|
||||
|
||||
import "github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
|
||||
type PrecheckItemBuilderConfig struct {
|
||||
PreInfoGetterOptions []GetPreInfoOption
|
||||
MDLoaderSetupOptions []mydump.MDLoaderSetupOption
|
||||
}
|
||||
|
||||
type PrecheckItemBuilderOption func(c *PrecheckItemBuilderConfig)
|
||||
|
||||
func WithPreInfoGetterOptions(opts ...GetPreInfoOption) PrecheckItemBuilderOption {
|
||||
return func(c *PrecheckItemBuilderConfig) {
|
||||
c.PreInfoGetterOptions = append([]GetPreInfoOption{}, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func WithMDLoaderSetupOptions(opts ...mydump.MDLoaderSetupOption) PrecheckItemBuilderOption {
|
||||
return func(c *PrecheckItemBuilderConfig) {
|
||||
c.MDLoaderSetupOptions = append([]mydump.MDLoaderSetupOption{}, opts...)
|
||||
}
|
||||
}
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
)
|
||||
|
||||
type CheckItemID string
|
||||
@ -49,43 +50,6 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte
|
||||
return context.WithValue(ctx, key, val)
|
||||
}
|
||||
|
||||
type PrecheckItemBuilderConfig struct {
|
||||
PreInfoGetterOptions []GetPreInfoOption
|
||||
MDLoaderSetupOptions []mydump.MDLoaderSetupOption
|
||||
}
|
||||
|
||||
type PrecheckItemBuilderOption interface {
|
||||
Apply(c *PrecheckItemBuilderConfig)
|
||||
}
|
||||
|
||||
type preInfoGetterOptsForBuilder struct {
|
||||
opts []GetPreInfoOption
|
||||
}
|
||||
|
||||
func (o *preInfoGetterOptsForBuilder) Apply(c *PrecheckItemBuilderConfig) {
|
||||
c.PreInfoGetterOptions = append([]GetPreInfoOption{}, o.opts...)
|
||||
}
|
||||
|
||||
func WithPreInfoGetterOptions(opts ...GetPreInfoOption) PrecheckItemBuilderOption {
|
||||
return &preInfoGetterOptsForBuilder{
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
type mdLoaderSetupOptsForBuilder struct {
|
||||
opts []mydump.MDLoaderSetupOption
|
||||
}
|
||||
|
||||
func (o *mdLoaderSetupOptsForBuilder) Apply(c *PrecheckItemBuilderConfig) {
|
||||
c.MDLoaderSetupOptions = append([]mydump.MDLoaderSetupOption{}, o.opts...)
|
||||
}
|
||||
|
||||
func WithMDLoaderSetupOptions(opts ...mydump.MDLoaderSetupOption) PrecheckItemBuilderOption {
|
||||
return &mdLoaderSetupOptsForBuilder{
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
type PrecheckItemBuilder struct {
|
||||
cfg *config.Config
|
||||
dbMetas []*mydump.MDDatabaseMeta
|
||||
@ -93,11 +57,11 @@ type PrecheckItemBuilder struct {
|
||||
checkpointsDB checkpoints.DB
|
||||
}
|
||||
|
||||
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
|
||||
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
|
||||
var gerr error
|
||||
builderCfg := new(PrecheckItemBuilderConfig)
|
||||
builderCfg := new(ropts.PrecheckItemBuilderConfig)
|
||||
for _, o := range opts {
|
||||
o.Apply(builderCfg)
|
||||
o(builderCfg)
|
||||
}
|
||||
targetDB, err := DBFromConfig(ctx, cfg.TiDB)
|
||||
if err != nil {
|
||||
@ -180,3 +144,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
|
||||
return nil, errors.Errorf("unsupported check item: %v", checkID)
|
||||
}
|
||||
}
|
||||
|
||||
// GetPreInfoGetter gets the pre restore info getter from the builder.
|
||||
func (b *PrecheckItemBuilder) GetPreInfoGetter() PreRestoreInfoGetter {
|
||||
return b.preInfoGetter
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
@ -59,7 +60,7 @@ func (s *precheckImplSuite) setMockImportData(mockDataMap map[string]*mock.MockD
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.preInfoGetter, err = NewPreRestoreInfoGetter(s.cfg, s.mockSrc.GetAllDBFileMetas(), s.mockSrc.GetStorage(), s.mockTarget, nil, nil, WithIgnoreDBNotExist(true))
|
||||
s.preInfoGetter, err = NewPreRestoreInfoGetter(s.cfg, s.mockSrc.GetAllDBFileMetas(), s.mockSrc.GetStorage(), s.mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1966,8 +1966,6 @@ func isTiDBBackend(cfg *config.Config) bool {
|
||||
// 4. Lightning configuration
|
||||
// before restore tables start.
|
||||
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
ctx = WithPreInfoGetterSysVarsCache(ctx, rc.sysVars)
|
||||
ctx = WithPreInfoGetterTableStructuresCache(ctx, rc.dbInfos)
|
||||
if err := rc.DataCheck(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -2036,18 +2034,17 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
needCheck = taskCheckpoints == nil
|
||||
}
|
||||
if needCheck {
|
||||
withSizeCacheCtx := WithPreInfoGetterEstimatedSrcSizeCache(ctx, estimatedSizeResult)
|
||||
err = rc.localResource(withSizeCacheCtx)
|
||||
err = rc.localResource(ctx)
|
||||
if err != nil {
|
||||
return common.ErrCheckLocalResource.Wrap(err).GenWithStackByArgs()
|
||||
}
|
||||
if err := rc.clusterResource(withSizeCacheCtx); err != nil {
|
||||
if err := rc.clusterResource(ctx); err != nil {
|
||||
if err1 := rc.taskMgr.CleanupTask(ctx); err1 != nil {
|
||||
log.FromContext(ctx).Warn("cleanup task failed", zap.Error(err1))
|
||||
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
|
||||
}
|
||||
}
|
||||
if err := rc.checkClusterRegion(withSizeCacheCtx); err != nil {
|
||||
if err := rc.checkClusterRegion(ctx); err != nil {
|
||||
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,6 +247,8 @@ func TestPreCheckFailed(t *testing.T) {
|
||||
require.Regexp(t, ".*mock init meta failure", err.Error())
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
|
||||
// clear the sys variable cache
|
||||
preInfoGetter.sysVarsCache = nil
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectQuery("SHOW VARIABLES WHERE Variable_name IN .*").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
|
||||
|
||||
@ -48,6 +48,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/metric"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
||||
restoremock "github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
|
||||
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/verification"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/web"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/worker"
|
||||
@ -1138,10 +1139,11 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
|
||||
return nil
|
||||
})
|
||||
require.NoError(s.T(), err)
|
||||
err = rc.clusterResource(WithPreInfoGetterEstimatedSrcSizeCache(ctx, &EstimateSourceDataSizeResult{
|
||||
preInfoGetter.estimatedSizeCache = &EstimateSourceDataSizeResult{
|
||||
SizeWithIndex: sourceSize,
|
||||
SizeWithoutIndex: sourceSize,
|
||||
}))
|
||||
}
|
||||
err = rc.clusterResource(ctx)
|
||||
require.NoError(s.T(), err)
|
||||
|
||||
require.Equal(s.T(), ca.expectErrorCount, template.FailedCount(Critical))
|
||||
@ -1283,7 +1285,8 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
|
||||
precheckItemBuilder: theCheckBuilder,
|
||||
}
|
||||
|
||||
ctx := WithPreInfoGetterTableStructuresCache(context.Background(), rc.dbInfos)
|
||||
preInfoGetter.dbInfosCache = rc.dbInfos
|
||||
ctx := context.Background()
|
||||
err := rc.checkClusterRegion(ctx)
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), ca.expectErrorCnt, template.FailedCount(Critical))
|
||||
@ -1433,19 +1436,22 @@ func (s *tableRestoreSuite) TestEstimate() {
|
||||
preInfoGetter: preInfoGetter,
|
||||
precheckItemBuilder: theCheckBuilder,
|
||||
}
|
||||
ctx = WithPreInfoGetterTableStructuresCache(ctx, dbInfos)
|
||||
source, _, _, err := rc.estimateSourceData(ctx)
|
||||
preInfoGetter.dbInfosCache = dbInfos
|
||||
estimateResult, err := preInfoGetter.EstimateSourceDataSize(ctx)
|
||||
s.Require().NoError(err)
|
||||
source := estimateResult.SizeWithIndex
|
||||
// Because this file is small than region split size so we does not sample it.
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), s.tableMeta.TotalSize, source)
|
||||
s.Require().Equal(s.tableMeta.TotalSize, source)
|
||||
s.tableMeta.TotalSize = int64(config.SplitRegionSize)
|
||||
source, _, _, err = rc.estimateSourceData(ctx)
|
||||
require.NoError(s.T(), err)
|
||||
require.Greater(s.T(), source, s.tableMeta.TotalSize)
|
||||
estimateResult, err = preInfoGetter.EstimateSourceDataSize(ctx, ropts.ForceReloadCache(true))
|
||||
s.Require().NoError(err)
|
||||
source = estimateResult.SizeWithIndex
|
||||
s.Require().Greater(source, s.tableMeta.TotalSize)
|
||||
rc.cfg.TikvImporter.Backend = config.BackendTiDB
|
||||
source, _, _, err = rc.estimateSourceData(ctx)
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), s.tableMeta.TotalSize, source)
|
||||
estimateResult, err = preInfoGetter.EstimateSourceDataSize(ctx, ropts.ForceReloadCache(true))
|
||||
s.Require().NoError(err)
|
||||
source = estimateResult.SizeWithIndex
|
||||
s.Require().Equal(s.tableMeta.TotalSize, source)
|
||||
}
|
||||
|
||||
func (s *tableRestoreSuite) TestSchemaIsValid() {
|
||||
@ -1849,7 +1855,8 @@ func (s *tableRestoreSuite) TestSchemaIsValid() {
|
||||
ioWorkers: ioWorkers,
|
||||
}
|
||||
ci := NewSchemaCheckItem(cfg, preInfoGetter, nil, nil).(*schemaCheckItem)
|
||||
msgs, err := ci.SchemaIsValid(WithPreInfoGetterTableStructuresCache(ctx, ca.dbInfos), ca.tableMeta)
|
||||
preInfoGetter.dbInfosCache = ca.dbInfos
|
||||
msgs, err := ci.SchemaIsValid(ctx, ca.tableMeta)
|
||||
require.NoError(s.T(), err)
|
||||
require.Len(s.T(), msgs, ca.MsgNum)
|
||||
if len(msgs) > 0 {
|
||||
@ -1919,7 +1926,8 @@ func (s *tableRestoreSuite) TestGBKEncodedSchemaIsValid() {
|
||||
ioWorkers: ioWorkers,
|
||||
}
|
||||
ci := NewSchemaCheckItem(cfg, preInfoGetter, nil, nil).(*schemaCheckItem)
|
||||
msgs, err := ci.SchemaIsValid(WithPreInfoGetterTableStructuresCache(ctx, dbInfos), &mydump.MDTableMeta{
|
||||
preInfoGetter.dbInfosCache = dbInfos
|
||||
msgs, err := ci.SchemaIsValid(ctx, &mydump.MDTableMeta{
|
||||
DB: "db1",
|
||||
Name: "gbk_table",
|
||||
DataFiles: []mydump.FileInfo{
|
||||
|
||||
Reference in New Issue
Block a user