354 lines
13 KiB
Go
354 lines
13 KiB
Go
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package export
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/DATA-DOG/go-sqlmock"
|
|
"github.com/coreos/go-semver/semver"
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/br/pkg/version"
|
|
tcontext "github.com/pingcap/tidb/dumpling/context"
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
|
|
"github.com/pingcap/tidb/pkg/util/promutil"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func TestDumpExit(t *testing.T) {
|
|
db, mock, err := sqlmock.New()
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, db.Close())
|
|
}()
|
|
|
|
mock.ExpectQuery(fmt.Sprintf("SHOW CREATE DATABASE `%s`", escapeString(database))).
|
|
WillDelayFor(time.Second).
|
|
WillReturnRows(sqlmock.NewRows([]string{"Database", "Create Database"}).
|
|
AddRow("test", "CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */"))
|
|
mock.ExpectQuery(fmt.Sprintf("SELECT DEFAULT_COLLATION_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s'", escapeString(database))).
|
|
WillReturnRows(sqlmock.NewRows([]string{"DEFAULT_COLLATION_NAME"}).
|
|
AddRow("utf8mb4_bin"))
|
|
|
|
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
|
|
defer cancel()
|
|
conn, err := db.Conn(tctx)
|
|
require.NoError(t, err)
|
|
baseConn := newBaseConn(conn, true, nil)
|
|
|
|
d := &Dumper{
|
|
tctx: tctx,
|
|
conf: DefaultConfig(),
|
|
cancelCtx: cancel,
|
|
}
|
|
wg, writingCtx := errgroup.WithContext(tctx)
|
|
writerErr := errors.New("writer error")
|
|
|
|
wg.Go(func() error {
|
|
return errors.Trace(writerErr)
|
|
})
|
|
wg.Go(func() error {
|
|
time.Sleep(time.Second)
|
|
return context.Canceled
|
|
})
|
|
|
|
writerCtx := tctx.WithContext(writingCtx)
|
|
taskChan := make(chan Task, 1)
|
|
taskChan <- &TaskDatabaseMeta{}
|
|
d.conf.Tables = DatabaseTables{}.AppendTable(database, nil)
|
|
d.conf.ServerInfo.ServerType = version.ServerTypeMySQL
|
|
require.ErrorIs(t, wg.Wait(), writerErr)
|
|
// if writerCtx is canceled , QuerySQL in `dumpDatabases` will return sqlmock.ErrCancelled
|
|
require.ErrorIs(t, d.dumpDatabases(writerCtx, baseConn, taskChan), sqlmock.ErrCancelled)
|
|
}
|
|
|
|
func TestDumpTableMeta(t *testing.T) {
|
|
db, mock, err := sqlmock.New()
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, db.Close())
|
|
}()
|
|
|
|
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
|
|
defer cancel()
|
|
conn, err := db.Conn(tctx)
|
|
require.NoError(t, err)
|
|
baseConn := newBaseConn(conn, true, nil)
|
|
|
|
conf := DefaultConfig()
|
|
conf.NoSchemas = true
|
|
|
|
for serverType := version.ServerTypeUnknown; serverType < version.ServerTypeAll; serverType++ {
|
|
conf.ServerInfo.ServerType = version.ServerType(serverType)
|
|
hasImplicitRowID := false
|
|
mock.ExpectQuery("SHOW COLUMNS FROM").
|
|
WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}).
|
|
AddRow("id", "int(11)", "NO", "PRI", nil, ""))
|
|
if serverType == version.ServerTypeTiDB {
|
|
mock.ExpectExec("SELECT _tidb_rowid from").
|
|
WillReturnResult(sqlmock.NewResult(0, 0))
|
|
hasImplicitRowID = true
|
|
}
|
|
mock.ExpectQuery(fmt.Sprintf("SELECT \\* FROM `%s`.`%s`", database, table)).
|
|
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
|
|
meta, err := dumpTableMeta(tctx, conf, baseConn, database, &TableInfo{Type: TableTypeBase, Name: table})
|
|
require.NoError(t, err)
|
|
require.Equal(t, database, meta.DatabaseName())
|
|
require.Equal(t, table, meta.TableName())
|
|
require.Equal(t, "*", meta.SelectedField())
|
|
require.Equal(t, 1, meta.SelectedLen())
|
|
require.Equal(t, "", meta.ShowCreateTable())
|
|
require.Equal(t, hasImplicitRowID, meta.HasImplicitRowID())
|
|
}
|
|
}
|
|
|
|
func TestGetListTableTypeByConf(t *testing.T) {
|
|
conf := defaultConfigForTest(t)
|
|
cases := []struct {
|
|
serverInfo version.ServerInfo
|
|
consistency string
|
|
expected listTableType
|
|
}{
|
|
{version.ParseServerInfo("5.7.25-TiDB-3.0.6"), ConsistencyTypeSnapshot, listTableByShowTableStatus},
|
|
// no bug version
|
|
{version.ParseServerInfo("8.0.2"), ConsistencyTypeLock, listTableByInfoSchema},
|
|
{version.ParseServerInfo("8.0.2"), ConsistencyTypeFlush, listTableByShowTableStatus},
|
|
{version.ParseServerInfo("8.0.23"), ConsistencyTypeNone, listTableByShowTableStatus},
|
|
|
|
// bug version
|
|
{version.ParseServerInfo("8.0.3"), ConsistencyTypeLock, listTableByInfoSchema},
|
|
{version.ParseServerInfo("8.0.3"), ConsistencyTypeFlush, listTableByShowFullTables},
|
|
{version.ParseServerInfo("8.0.3"), ConsistencyTypeNone, listTableByShowTableStatus},
|
|
}
|
|
|
|
for _, x := range cases {
|
|
conf.Consistency = x.consistency
|
|
conf.ServerInfo = x.serverInfo
|
|
require.Equalf(t, x.expected, getListTableTypeByConf(conf), "server info: %s, consistency: %s", x.serverInfo, x.consistency)
|
|
}
|
|
}
|
|
|
|
func TestAdjustDatabaseCollation(t *testing.T) {
|
|
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
|
|
defer cancel()
|
|
parser1 := parser.New()
|
|
|
|
originSQLs := []string{
|
|
"create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"create database `test` CHARACTER SET=utf8mb4",
|
|
}
|
|
|
|
expectedSQLs := []string{
|
|
"create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
|
|
}
|
|
charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
|
|
|
|
for _, originSQL := range originSQLs {
|
|
newSQL, err := adjustDatabaseCollation(tctx, LooseCollationCompatible, parser1, originSQL, charsetAndDefaultCollationMap)
|
|
require.NoError(t, err)
|
|
require.Equal(t, originSQL, newSQL)
|
|
}
|
|
|
|
for i, originSQL := range originSQLs {
|
|
newSQL, err := adjustDatabaseCollation(tctx, StrictCollationCompatible, parser1, originSQL, charsetAndDefaultCollationMap)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedSQLs[i], newSQL)
|
|
}
|
|
}
|
|
|
|
func TestAdjustTableCollation(t *testing.T) {
|
|
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
|
|
defer cancel()
|
|
|
|
parser1 := parser.New()
|
|
|
|
originSQLs := []string{
|
|
"create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"create table `test`.`t1` (id int) CHARSET=utf8mb4",
|
|
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ",
|
|
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
|
|
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 ",
|
|
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4",
|
|
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4",
|
|
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4",
|
|
"create table `test`.`t1` (name varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin) CHARSET=latin1 COLLATE=latin1_bin",
|
|
}
|
|
|
|
expectedSQLs := []string{
|
|
"CREATE TABLE `test`.`t1` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
|
|
"CREATE TABLE `test`.`t1` (`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_bin) DEFAULT CHARACTER SET = LATIN1 DEFAULT COLLATE = LATIN1_BIN",
|
|
}
|
|
|
|
charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
|
|
|
|
for _, originSQL := range originSQLs {
|
|
newSQL, err := adjustTableCollation(tctx, LooseCollationCompatible, parser1, originSQL, charsetAndDefaultCollationMap)
|
|
require.NoError(t, err)
|
|
require.Equal(t, originSQL, newSQL)
|
|
}
|
|
|
|
for i, originSQL := range originSQLs {
|
|
newSQL, err := adjustTableCollation(tctx, StrictCollationCompatible, parser1, originSQL, charsetAndDefaultCollationMap)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedSQLs[i], newSQL)
|
|
}
|
|
}
|
|
|
|
func TestUnregisterMetrics(t *testing.T) {
|
|
ctx := context.Background()
|
|
conf := &Config{
|
|
SQL: "not empty",
|
|
Where: "not empty",
|
|
PromFactory: promutil.NewDefaultFactory(),
|
|
PromRegistry: promutil.NewDefaultRegistry(),
|
|
}
|
|
|
|
_, err := NewDumper(ctx, conf)
|
|
require.Error(t, err)
|
|
_, err = NewDumper(ctx, conf)
|
|
// should not panic
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestSetDefaultSessionParams(t *testing.T) {
|
|
testCases := []struct {
|
|
si version.ServerInfo
|
|
sessionParams map[string]any
|
|
expectedParams map[string]any
|
|
}{
|
|
{
|
|
si: version.ServerInfo{
|
|
ServerType: version.ServerTypeTiDB,
|
|
HasTiKV: true,
|
|
ServerVersion: semver.New("6.1.0"),
|
|
},
|
|
sessionParams: map[string]any{
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
expectedParams: map[string]any{
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
},
|
|
{
|
|
si: version.ServerInfo{
|
|
ServerType: version.ServerTypeTiDB,
|
|
HasTiKV: true,
|
|
ServerVersion: semver.New("6.2.0"),
|
|
},
|
|
sessionParams: map[string]any{
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
expectedParams: map[string]any{
|
|
"tidb_enable_paging": "ON",
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
},
|
|
{
|
|
si: version.ServerInfo{
|
|
ServerType: version.ServerTypeTiDB,
|
|
HasTiKV: true,
|
|
ServerVersion: semver.New("6.2.0"),
|
|
},
|
|
sessionParams: map[string]any{
|
|
"tidb_enable_paging": "OFF",
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
expectedParams: map[string]any{
|
|
"tidb_enable_paging": "OFF",
|
|
"tidb_snapshot": "2020-01-01 00:00:00",
|
|
},
|
|
},
|
|
{
|
|
si: version.ServerInfo{
|
|
ServerType: version.ServerTypeMySQL,
|
|
ServerVersion: semver.New("8.0.32"),
|
|
},
|
|
sessionParams: map[string]any{},
|
|
expectedParams: map[string]any{},
|
|
},
|
|
}
|
|
|
|
for _, testCase := range testCases {
|
|
setDefaultSessionParams(testCase.si, testCase.sessionParams)
|
|
require.Equal(t, testCase.expectedParams, testCase.sessionParams)
|
|
}
|
|
}
|
|
|
|
func TestSetSessionParams(t *testing.T) {
|
|
// case 1: fail to set tidb_snapshot, should return error with hint
|
|
db, mock, err := sqlmock.New()
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, db.Close())
|
|
}()
|
|
|
|
mock.ExpectQuery("SELECT @@tidb_config").
|
|
WillReturnError(errors.New("mock error"))
|
|
mock.ExpectQuery("SELECT COUNT\\(1\\) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'").
|
|
WillReturnError(errors.New("mock error"))
|
|
tikvErr := &mysql.MySQLError{
|
|
Number: 1105,
|
|
Message: "can not get 'tikv_gc_safe_point'",
|
|
}
|
|
mock.ExpectExec("SET SESSION tidb_snapshot").
|
|
WillReturnError(tikvErr)
|
|
|
|
testfailpoint.Enable(t, "github.com/pingcap/tidb/dumpling/export/SkipResetDB", "return(true)")
|
|
|
|
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
|
|
defer cancel()
|
|
|
|
conf := DefaultConfig()
|
|
conf.ServerInfo = version.ServerInfo{
|
|
ServerType: version.ServerTypeTiDB,
|
|
HasTiKV: false,
|
|
}
|
|
conf.Snapshot = "439153276059648000"
|
|
conf.Consistency = ConsistencyTypeSnapshot
|
|
d := &Dumper{
|
|
tctx: tctx,
|
|
conf: conf,
|
|
cancelCtx: cancel,
|
|
dbHandle: db,
|
|
}
|
|
err = setSessionParam(d)
|
|
require.ErrorContains(t, err, "consistency=none")
|
|
|
|
// case 2: fail to set other
|
|
conf.ServerInfo = version.ServerInfo{
|
|
ServerType: version.ServerTypeMySQL,
|
|
HasTiKV: false,
|
|
}
|
|
conf.Snapshot = ""
|
|
conf.Consistency = ConsistencyTypeFlush
|
|
conf.SessionParams = map[string]any{
|
|
"mock": "UTC",
|
|
}
|
|
d.dbHandle = db
|
|
mock.ExpectExec("SET SESSION mock").
|
|
WillReturnError(errors.New("Unknown system variable mock"))
|
|
mock.ExpectClose()
|
|
mock.ExpectClose()
|
|
|
|
err = setSessionParam(d)
|
|
require.NoError(t, err)
|
|
}
|