Files
tidb/dumpling/export/config.go

798 lines
27 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package export
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"slices"
"strconv"
"strings"
"text/template"
"time"
"github.com/coreos/go-semver/semver"
"github.com/docker/go-units"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/compressedio"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/promutil"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/atomic"
"go.uber.org/zap"
)
const (
flagDatabase = "database"
flagTablesList = "tables-list"
flagHost = "host"
flagUser = "user"
flagPort = "port"
flagPassword = "password"
flagAllowCleartextPasswords = "allow-cleartext-passwords"
flagThreads = "threads"
flagFilesize = "filesize"
flagStatementSize = "statement-size"
flagOutput = "output"
flagLoglevel = "loglevel"
flagLogfile = "logfile"
flagLogfmt = "logfmt"
flagConsistency = "consistency"
flagSnapshot = "snapshot"
flagNoViews = "no-views"
flagNoSequences = "no-sequences"
flagSortByPk = "order-by-primary-key"
flagStatusAddr = "status-addr"
flagRows = "rows"
flagWhere = "where"
flagEscapeBackslash = "escape-backslash"
flagFiletype = "filetype"
flagNoHeader = "no-header"
flagNoSchemas = "no-schemas"
flagNoData = "no-data"
flagCsvNullValue = "csv-null-value"
flagSQL = "sql"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagDumpEmptyDatabase = "dump-empty-database"
flagTidbMemQuotaQuery = "tidb-mem-quota-query"
flagCA = "ca"
flagCert = "cert"
flagKey = "key"
flagCsvSeparator = "csv-separator"
flagCsvDelimiter = "csv-delimiter"
flagCsvLineTerminator = "csv-line-terminator"
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"
flagCompress = "compress"
flagCsvOutputDialect = "csv-output-dialect"
// FlagHelp represents the help flag
FlagHelp = "help"
)
// CSVDialect is the dialect of the CSV output for compatible with different import target
type CSVDialect int
const (
// CSVDialectDefault is the default dialect, which is MySQL/MariaDB/TiDB etc.
CSVDialectDefault CSVDialect = iota
// CSVDialectSnowflake is the dialect of Snowflake
CSVDialectSnowflake
// CSVDialectRedshift is the dialect of Redshift
CSVDialectRedshift
// CSVDialectBigQuery is the dialect of BigQuery
CSVDialectBigQuery
)
// BinaryFormat is the format of binary data
// Three standard formats are supported: UTF8, HEX and Base64 now.
type BinaryFormat int
const (
// BinaryFormatUTF8 is the default format, format binary data as UTF8 string
BinaryFormatUTF8 BinaryFormat = iota
// BinaryFormatHEX format binary data as HEX string, e.g. 12ABCD
BinaryFormatHEX
// BinaryFormatBase64 format binary data as Base64 string, e.g. 123qwer==
BinaryFormatBase64
)
// DialectBinaryFormatMap is the map of dialect and binary format
var DialectBinaryFormatMap = map[CSVDialect]BinaryFormat{
CSVDialectDefault: BinaryFormatUTF8,
CSVDialectSnowflake: BinaryFormatHEX,
CSVDialectRedshift: BinaryFormatHEX,
CSVDialectBigQuery: BinaryFormatBase64,
}
// Config is the dump config for dumpling
type Config struct {
objstore.BackendOptions
SpecifiedTables bool
AllowCleartextPasswords bool
SortByPk bool
NoViews bool
NoSequences bool
NoHeader bool
NoSchemas bool
NoData bool
CompleteInsert bool
TransactionalConsistency bool
EscapeBackslash bool
DumpEmptyDatabase bool
PosAfterConnect bool
CompressType compressedio.CompressType
Host string
Port int
Threads int
User string
Password string `json:"-"`
Security struct {
TLS *tls.Config `json:"-"`
CAPath string
CertPath string
KeyPath string
SSLCABytes []byte `json:"-"`
SSLCertBytes []byte `json:"-"`
SSLKeyBytes []byte `json:"-"`
}
LogLevel string
LogFile string
LogFormat string
OutputDirPath string
StatusAddr string
Snapshot string
Consistency string
CsvNullValue string
SQL string
CsvSeparator string
CsvDelimiter string
CsvLineTerminator string
Databases []string
TableFilter filter.Filter `json:"-"`
Where string
FileType string
ServerInfo version.ServerInfo
Logger *zap.Logger `json:"-"`
OutputFileTemplate *template.Template `json:"-"`
Rows uint64
ReadTimeout time.Duration
TiDBMemQuotaQuery uint64
FileSize uint64
StatementSize uint64
SessionParams map[string]any
Tables DatabaseTables
CollationCompatible string
CsvOutputDialect CSVDialect
Labels prometheus.Labels `json:"-"`
PromFactory promutil.Factory `json:"-"`
PromRegistry promutil.Registry `json:"-"`
ExtStorage storeapi.Storage `json:"-"`
MinTLSVersion uint16 `json:"-"`
IOTotalBytes *atomic.Uint64
Net string
}
// ServerInfoUnknown is the unknown database type to dumpling
var ServerInfoUnknown = version.ServerInfo{
ServerType: version.ServerTypeUnknown,
ServerVersion: nil,
}
// DefaultConfig returns the default export Config for dumpling
func DefaultConfig() *Config {
allFilter, _ := filter.Parse([]string{"*.*"})
return &Config{
Databases: nil,
Host: "127.0.0.1",
User: "root",
Port: 3306,
Password: "",
Threads: 4,
Logger: nil,
StatusAddr: ":8281",
FileSize: UnspecifiedSize,
StatementSize: DefaultStatementSize,
OutputDirPath: ".",
ServerInfo: ServerInfoUnknown,
SortByPk: true,
Tables: nil,
Snapshot: "",
Consistency: ConsistencyTypeAuto,
NoViews: true,
NoSequences: true,
Rows: UnspecifiedSize,
Where: "",
EscapeBackslash: true,
FileType: "",
NoHeader: false,
NoSchemas: false,
NoData: false,
CsvNullValue: "\\N",
SQL: "",
TableFilter: allFilter,
DumpEmptyDatabase: true,
CsvDelimiter: "\"",
CsvSeparator: ",",
CsvLineTerminator: "\r\n",
SessionParams: make(map[string]any),
OutputFileTemplate: DefaultOutputFileTemplate,
PosAfterConnect: false,
CollationCompatible: LooseCollationCompatible,
CsvOutputDialect: CSVDialectDefault,
SpecifiedTables: false,
PromFactory: promutil.NewDefaultFactory(),
PromRegistry: promutil.NewDefaultRegistry(),
TransactionalConsistency: true,
}
}
// String returns dumpling's config in json format
func (conf *Config) String() string {
cfg, err := json.Marshal(conf)
if err != nil && conf.Logger != nil {
conf.Logger.Error("fail to marshal config to json", zap.Error(err))
}
return string(cfg)
}
// GetDriverConfig returns the MySQL driver config from Config.
func (conf *Config) GetDriverConfig(db string) *mysql.Config {
driverCfg := mysql.NewConfig()
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
hostPort := net.JoinHostPort(conf.Host, strconv.Itoa(conf.Port))
driverCfg.User = conf.User
driverCfg.Passwd = conf.Password
driverCfg.Net = "tcp"
if conf.Net != "" {
driverCfg.Net = conf.Net
}
driverCfg.Addr = hostPort
driverCfg.DBName = db
driverCfg.Collation = "utf8mb4_general_ci"
driverCfg.ReadTimeout = conf.ReadTimeout
driverCfg.WriteTimeout = 30 * time.Second
driverCfg.InterpolateParams = true
driverCfg.MaxAllowedPacket = 0
if conf.Security.TLS != nil {
driverCfg.TLS = conf.Security.TLS
} else {
// Use TLS first.
driverCfg.AllowFallbackToPlaintext = true
minTLSVersion := uint16(tls.VersionTLS12)
if conf.MinTLSVersion != 0 {
minTLSVersion = conf.MinTLSVersion
}
/* #nosec G402 */
driverCfg.TLS = &tls.Config{
InsecureSkipVerify: true,
MinVersion: minTLSVersion,
NextProtos: []string{"h2", "http/1.1"}, // specify `h2` to let Go use HTTP/2.
}
}
if conf.AllowCleartextPasswords {
driverCfg.AllowCleartextPasswords = true
}
failpoint.Inject("SetWaitTimeout", func(val failpoint.Value) {
driverCfg.Params = map[string]string{
"wait_timeout": strconv.Itoa(val.(int)),
}
})
return driverCfg
}
func timestampDirName() string {
return fmt.Sprintf("./export-%s", time.Now().Format(time.RFC3339))
}
// DefineFlags defines flags of dumpling's configuration
func (*Config) DefineFlags(flags *pflag.FlagSet) {
objstore.DefineFlags(flags)
flags.StringSliceP(flagDatabase, "B", nil, "Databases to dump")
flags.StringSliceP(flagTablesList, "T", nil, "Comma delimited table list to dump; must be qualified table names")
flags.StringP(flagHost, "h", "127.0.0.1", "The host to connect to")
flags.StringP(flagUser, "u", "root", "Username with privileges to run the dump")
flags.IntP(flagPort, "P", 4000, "TCP/IP port to connect to")
flags.StringP(flagPassword, "p", "", "User password")
flags.Bool(flagAllowCleartextPasswords, false, "Allow passwords to be sent in cleartext (warning: don't use without TLS)")
flags.IntP(flagThreads, "t", 4, "Number of goroutines to use, default 4")
flags.StringP(flagFilesize, "F", "", "The approximate size of output file")
flags.Uint64P(flagStatementSize, "s", DefaultStatementSize, "Attempted size of INSERT statement in bytes")
flags.StringP(flagOutput, "o", timestampDirName(), "Output directory")
flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}")
flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console")
flags.String(flagLogfmt, "text", "Log `format`: {text|json}")
flags.String(flagConsistency, ConsistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagSnapshot, "", "Snapshot position (uint64 or MySQL style string timestamp). Valid only when consistency=snapshot")
flags.BoolP(flagNoViews, "W", true, "Do not dump views")
flags.Bool(flagNoSequences, true, "Do not dump sequences")
flags.Bool(flagSortByPk, true, "Sort dump results by primary key through order by sql")
flags.String(flagStatusAddr, ":8281", "dumpling API server and pprof addr")
flags.Uint64P(flagRows, "r", UnspecifiedSize, "If specified, dumpling will split table into chunks and concurrently dump them to different files to improve efficiency. For TiDB v3.0+, specify this will make dumpling split table with each file one TiDB region(no matter how many rows is).\n"+
"If not specified, dumpling will dump table without inner-concurrency which could be relatively slow. default unlimited")
flags.String(flagWhere, "", "Dump only selected records")
flags.Bool(flagEscapeBackslash, true, "use backslash to escape special characters")
flags.String(flagFiletype, "", "The type of export file (sql/csv)")
flags.Bool(flagNoHeader, false, "whether not to dump CSV table header")
flags.BoolP(flagNoSchemas, "m", false, "Do not dump table schemas with the data")
flags.BoolP(flagNoData, "d", false, "Do not dump table data")
flags.String(flagCsvNullValue, "\\N", "The null value used when export to csv")
flags.StringP(flagSQL, "S", "", "Dump data with given sql. This argument doesn't support concurrent dump")
_ = flags.MarkHidden(flagSQL)
flags.StringSliceP(flagFilter, "f", []string{"*.*", DefaultTableFilter}, "filter to select which tables to dump")
flags.Bool(flagCaseSensitive, false, "whether the filter should be case-sensitive")
flags.Bool(flagDumpEmptyDatabase, true, "whether to dump empty database")
flags.Uint64(flagTidbMemQuotaQuery, UnspecifiedSize, "The maximum memory limit for a single SQL statement, in bytes.")
flags.String(flagCA, "", "The path name to the certificate authority file for TLS connection")
flags.String(flagCert, "", "The path name to the client certificate file for TLS connection")
flags.String(flagKey, "", "The path name to the client private key file for TLS connection")
flags.String(flagCsvSeparator, ",", "The separator for csv files, default ','")
flags.String(flagCsvDelimiter, "\"", "The delimiter for values in csv files, default '\"'")
flags.String(flagCsvLineTerminator, "\r\n", "The line terminator for csv files, default '\\r\\n'")
flags.String(flagOutputFilenameTemplate, "", "The output filename template (without file extension)")
flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names")
flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`)
flags.Bool(FlagHelp, false, "Print help message and quit")
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
_ = flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
_ = flags.MarkHidden(flagTransactionalConsistency)
flags.StringP(flagCompress, "c", "", "Compress output file type, support 'gzip', 'snappy', 'zstd', 'no-compression' now")
flags.String(flagCsvOutputDialect, "", "The dialect of output CSV file, support 'snowflake', 'redshift', 'bigquery' now")
}
// ParseFromFlags parses dumpling's export.Config from flags
// nolint: gocyclo
func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
conf.Databases, err = flags.GetStringSlice(flagDatabase)
if err != nil {
return errors.Trace(err)
}
conf.Host, err = flags.GetString(flagHost)
if err != nil {
return errors.Trace(err)
}
conf.User, err = flags.GetString(flagUser)
if err != nil {
return errors.Trace(err)
}
conf.Port, err = flags.GetInt(flagPort)
if err != nil {
return errors.Trace(err)
}
conf.Password, err = flags.GetString(flagPassword)
if err != nil {
return errors.Trace(err)
}
conf.AllowCleartextPasswords, err = flags.GetBool(flagAllowCleartextPasswords)
if err != nil {
return errors.Trace(err)
}
conf.Threads, err = flags.GetInt(flagThreads)
if err != nil {
return errors.Trace(err)
}
conf.StatementSize, err = flags.GetUint64(flagStatementSize)
if err != nil {
return errors.Trace(err)
}
conf.OutputDirPath, err = flags.GetString(flagOutput)
if err != nil {
return errors.Trace(err)
}
conf.LogLevel, err = flags.GetString(flagLoglevel)
if err != nil {
return errors.Trace(err)
}
conf.LogFile, err = flags.GetString(flagLogfile)
if err != nil {
return errors.Trace(err)
}
conf.LogFormat, err = flags.GetString(flagLogfmt)
if err != nil {
return errors.Trace(err)
}
conf.Consistency, err = flags.GetString(flagConsistency)
if err != nil {
return errors.Trace(err)
}
conf.Snapshot, err = flags.GetString(flagSnapshot)
if err != nil {
return errors.Trace(err)
}
conf.NoViews, err = flags.GetBool(flagNoViews)
if err != nil {
return errors.Trace(err)
}
conf.NoSequences, err = flags.GetBool(flagNoSequences)
if err != nil {
return errors.Trace(err)
}
conf.SortByPk, err = flags.GetBool(flagSortByPk)
if err != nil {
return errors.Trace(err)
}
conf.StatusAddr, err = flags.GetString(flagStatusAddr)
if err != nil {
return errors.Trace(err)
}
conf.Rows, err = flags.GetUint64(flagRows)
if err != nil {
return errors.Trace(err)
}
conf.Where, err = flags.GetString(flagWhere)
if err != nil {
return errors.Trace(err)
}
conf.EscapeBackslash, err = flags.GetBool(flagEscapeBackslash)
if err != nil {
return errors.Trace(err)
}
conf.FileType, err = flags.GetString(flagFiletype)
if err != nil {
return errors.Trace(err)
}
conf.NoHeader, err = flags.GetBool(flagNoHeader)
if err != nil {
return errors.Trace(err)
}
conf.NoSchemas, err = flags.GetBool(flagNoSchemas)
if err != nil {
return errors.Trace(err)
}
conf.NoData, err = flags.GetBool(flagNoData)
if err != nil {
return errors.Trace(err)
}
conf.CsvNullValue, err = flags.GetString(flagCsvNullValue)
if err != nil {
return errors.Trace(err)
}
conf.SQL, err = flags.GetString(flagSQL)
if err != nil {
return errors.Trace(err)
}
conf.DumpEmptyDatabase, err = flags.GetBool(flagDumpEmptyDatabase)
if err != nil {
return errors.Trace(err)
}
conf.Security.CAPath, err = flags.GetString(flagCA)
if err != nil {
return errors.Trace(err)
}
conf.Security.CertPath, err = flags.GetString(flagCert)
if err != nil {
return errors.Trace(err)
}
conf.Security.KeyPath, err = flags.GetString(flagKey)
if err != nil {
return errors.Trace(err)
}
conf.CsvSeparator, err = flags.GetString(flagCsvSeparator)
if err != nil {
return errors.Trace(err)
}
conf.CsvDelimiter, err = flags.GetString(flagCsvDelimiter)
if err != nil {
return errors.Trace(err)
}
conf.CsvLineTerminator, err = flags.GetString(flagCsvLineTerminator)
if err != nil {
return errors.Trace(err)
}
conf.CompleteInsert, err = flags.GetBool(flagCompleteInsert)
if err != nil {
return errors.Trace(err)
}
conf.ReadTimeout, err = flags.GetDuration(flagReadTimeout)
if err != nil {
return errors.Trace(err)
}
conf.TransactionalConsistency, err = flags.GetBool(flagTransactionalConsistency)
if err != nil {
return errors.Trace(err)
}
conf.TiDBMemQuotaQuery, err = flags.GetUint64(flagTidbMemQuotaQuery)
if err != nil {
return errors.Trace(err)
}
if conf.Threads <= 0 {
return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads)
}
if len(conf.CsvSeparator) == 0 {
return errors.New("--csv-separator is set to \"\". It must not be an empty string")
}
if conf.SessionParams == nil {
conf.SessionParams = make(map[string]any)
}
tablesList, err := flags.GetStringSlice(flagTablesList)
if err != nil {
return errors.Trace(err)
}
fileSizeStr, err := flags.GetString(flagFilesize)
if err != nil {
return errors.Trace(err)
}
filters, err := flags.GetStringSlice(flagFilter)
if err != nil {
return errors.Trace(err)
}
caseSensitive, err := flags.GetBool(flagCaseSensitive)
if err != nil {
return errors.Trace(err)
}
outputFilenameFormat, err := flags.GetString(flagOutputFilenameTemplate)
if err != nil {
return errors.Trace(err)
}
params, err := flags.GetStringToString(flagParams)
if err != nil {
return errors.Trace(err)
}
conf.SpecifiedTables = len(tablesList) > 0
conf.Tables, err = GetConfTables(tablesList)
if err != nil {
return errors.Trace(err)
}
conf.TableFilter, err = ParseTableFilter(tablesList, filters)
if err != nil {
return errors.Errorf("failed to parse filter: %s", err)
}
if !caseSensitive {
conf.TableFilter = filter.CaseInsensitive(conf.TableFilter)
}
conf.FileSize, err = ParseFileSize(fileSizeStr)
if err != nil {
return errors.Trace(err)
}
if outputFilenameFormat == "" && conf.SQL != "" {
outputFilenameFormat = DefaultAnonymousOutputFileTemplateText
}
tmpl, err := ParseOutputFileTemplate(outputFilenameFormat)
if err != nil {
return errors.Errorf("failed to parse output filename template (--output-filename-template '%s')", outputFilenameFormat)
}
conf.OutputFileTemplate = tmpl
compressType, err := flags.GetString(flagCompress)
if err != nil {
return errors.Trace(err)
}
conf.CompressType, err = compressedio.ParseCompressType(compressType)
if err != nil {
return errors.Trace(err)
}
dialect, err := flags.GetString(flagCsvOutputDialect)
if err != nil {
return errors.Trace(err)
}
if dialect != "" && conf.FileType != "csv" {
return errors.Errorf("%s is only supported when dumping whole table to csv, not compatible with %s", flagCsvOutputDialect, conf.FileType)
}
conf.CsvOutputDialect, err = ParseOutputDialect(dialect)
if err != nil {
return errors.Trace(err)
}
for k, v := range params {
conf.SessionParams[k] = v
}
err = conf.BackendOptions.ParseFromFlags(pflag.CommandLine)
if err != nil {
return errors.Trace(err)
}
return nil
}
// ParseFileSize parses file size from tables-list and filter arguments
func ParseFileSize(fileSizeStr string) (uint64, error) {
if len(fileSizeStr) == 0 {
return UnspecifiedSize, nil
} else if fileSizeMB, err := strconv.ParseUint(fileSizeStr, 10, 64); err == nil {
fmt.Printf("Warning: -F without unit is not recommended, try using `-F '%dMiB'` in the future\n", fileSizeMB)
return fileSizeMB * units.MiB, nil
} else if size, err := units.RAMInBytes(fileSizeStr); err == nil {
return uint64(size), nil
}
return 0, errors.Errorf("failed to parse filesize (-F '%s')", fileSizeStr)
}
// ParseTableFilter parses table filter from tables-list and filter arguments
func ParseTableFilter(tablesList, filters []string) (filter.Filter, error) {
if len(tablesList) == 0 {
return filter.Parse(filters)
}
// only parse -T when -f is default value. otherwise bail out.
if !slices.Equal(filters, []string{"*.*", DefaultTableFilter}) {
return nil, errors.New("cannot pass --tables-list and --filter together")
}
tableNames := make([]filter.Table, 0, len(tablesList))
for _, table := range tablesList {
parts := strings.SplitN(table, ".", 2)
if len(parts) < 2 {
return nil, errors.Errorf("--tables-list only accepts qualified table names, but `%s` lacks a dot", table)
}
tableNames = append(tableNames, filter.Table{Schema: parts[0], Name: parts[1]})
}
return filter.NewTablesFilter(tableNames...), nil
}
// GetConfTables parses tables from tables-list and filter arguments
func GetConfTables(tablesList []string) (DatabaseTables, error) {
dbTables := DatabaseTables{}
var (
tablename string
avgRowLength uint64
)
avgRowLength = 0
for _, tablename = range tablesList {
parts := strings.SplitN(tablename, ".", 2)
if len(parts) < 2 {
return nil, errors.Errorf("--tables-list only accepts qualified table names, but `%s` lacks a dot", tablename)
}
dbName := parts[0]
tbName := parts[1]
dbTables[dbName] = append(dbTables[dbName], &TableInfo{tbName, avgRowLength, TableTypeBase})
}
return dbTables, nil
}
// ParseOutputDialect parses output dialect string to Dialect
func ParseOutputDialect(outputDialect string) (CSVDialect, error) {
switch outputDialect {
case "", "default":
return CSVDialectDefault, nil
case "snowflake":
return CSVDialectSnowflake, nil
case "redshift":
return CSVDialectRedshift, nil
case "bigquery":
return CSVDialectBigQuery, nil
default:
return CSVDialectDefault, errors.Errorf("unknown output dialect %s", outputDialect)
}
}
func (conf *Config) createExternalStorage(ctx context.Context) (storeapi.Storage, error) {
if conf.ExtStorage != nil {
return conf.ExtStorage, nil
}
b, err := objstore.ParseBackend(conf.OutputDirPath, &conf.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
}
// TODO: support setting httpClient with certification later
return objstore.New(ctx, b, &storeapi.Options{})
}
const (
// UnspecifiedSize means the filesize/statement-size is unspecified
UnspecifiedSize = 0
// DefaultStatementSize is the default statement size
DefaultStatementSize = 1000000
// TiDBMemQuotaQueryName is the session variable TiDBMemQuotaQuery's name in TiDB
TiDBMemQuotaQueryName = "tidb_mem_quota_query"
// DefaultTableFilter is the default exclude table filter. It will exclude all system databases
DefaultTableFilter = "!/^(mysql|sys|INFORMATION_SCHEMA|PERFORMANCE_SCHEMA|METRICS_SCHEMA|INSPECTION_SCHEMA)$/.*"
defaultTaskChannelCapacity = 128
defaultDumpGCSafePointTTL = 5 * 60
defaultEtcdDialTimeOut = 3 * time.Second
// LooseCollationCompatible is used in DM, represents a collation setting for best compatibility.
LooseCollationCompatible = "loose"
// StrictCollationCompatible is used in DM, represents a collation setting for correctness.
StrictCollationCompatible = "strict"
dumplingServiceSafePointPrefix = "dumpling"
)
var (
decodeRegionVersion = semver.New("3.0.0")
gcSafePointVersion = semver.New("4.0.0")
tableSampleVersion = semver.New("5.0.0-nightly")
minNewTerminologyMySQL = semver.New("8.4.0") // first MySQL version to no longer support MASTER/SLAVE/etc
)
func adjustConfig(conf *Config, fns ...func(*Config) error) error {
for _, f := range fns {
err := f(conf)
if err != nil {
return err
}
}
return nil
}
func buildTLSConfig(conf *Config) error {
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(conf.Security.CAPath),
util.WithCertAndKeyPath(conf.Security.CertPath, conf.Security.KeyPath),
util.WithCAContent(conf.Security.SSLCABytes),
util.WithCertAndKeyContent(conf.Security.SSLCertBytes, conf.Security.SSLKeyBytes),
util.WithMinTLSVersion(conf.MinTLSVersion),
)
if err != nil {
return errors.Trace(err)
}
conf.Security.TLS = tlsConfig
return nil
}
func validateSpecifiedSQL(conf *Config) error {
if conf.SQL != "" && conf.Where != "" {
return errors.New("can't specify both --sql and --where at the same time. Please try to combine them into --sql")
}
return nil
}
func adjustFileFormat(conf *Config) error {
conf.FileType = strings.ToLower(conf.FileType)
switch conf.FileType {
case "":
if conf.SQL != "" {
conf.FileType = FileFormatCSVString
} else {
conf.FileType = FileFormatSQLTextString
}
case FileFormatSQLTextString:
if conf.SQL != "" {
return errors.Errorf("unsupported config.FileType '%s' when we specify --sql, please unset --filetype or set it to 'csv'", conf.FileType)
}
case FileFormatCSVString:
default:
return errors.Errorf("unknown config.FileType '%s'", conf.FileType)
}
return nil
}
func matchMysqlBugversion(info version.ServerInfo) bool {
// if 8.0.3 <= mysql8 version < 8.0.23
// FLUSH TABLES WITH READ LOCK could block other sessions from executing SHOW TABLE STATUS.
// see more in https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
if info.ServerType != version.ServerTypeMySQL {
return false
}
currentVersion := info.ServerVersion
bugVersionStart := semver.New("8.0.2")
bugVersionEnd := semver.New("8.0.23")
return bugVersionStart.LessThan(*currentVersion) && currentVersion.LessThan(*bugVersionEnd)
}