1713 lines
56 KiB
Go
1713 lines
56 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package export
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/big"
|
|
"net"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/coreos/go-semver/semver"
|
|
// import mysql driver
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
pclog "github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/summary"
|
|
"github.com/pingcap/tidb/br/pkg/version"
|
|
"github.com/pingcap/tidb/dumpling/cli"
|
|
tcontext "github.com/pingcap/tidb/dumpling/context"
|
|
"github.com/pingcap/tidb/dumpling/log"
|
|
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/format"
|
|
"github.com/pingcap/tidb/pkg/store/helper"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
pd "github.com/tikv/pd/client"
|
|
"github.com/tikv/pd/client/pkg/caller"
|
|
gatomic "go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var openDBFunc = openDB
|
|
|
|
var errEmptyHandleVals = errors.New("empty handleVals for TiDB table")
|
|
|
|
// After TiDB v6.2.0 we always enable tidb_enable_paging by default.
|
|
// see https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_enable_paging-%E4%BB%8E-v540-%E7%89%88%E6%9C%AC%E5%BC%80%E5%A7%8B%E5%BC%95%E5%85%A5
|
|
var enablePagingVersion = semver.New("6.2.0")
|
|
|
|
// Dumper is the dump progress structure
|
|
type Dumper struct {
|
|
tctx *tcontext.Context
|
|
cancelCtx context.CancelFunc
|
|
conf *Config
|
|
metrics *metrics
|
|
|
|
extStore storeapi.Storage
|
|
dbHandle *sql.DB
|
|
|
|
tidbPDClientForGC pd.Client
|
|
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
|
|
totalTables int64
|
|
charsetAndDefaultCollationMap map[string]string
|
|
|
|
speedRecorder *SpeedRecorder
|
|
}
|
|
|
|
// NewDumper returns a new Dumper
|
|
func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
|
|
failpoint.Inject("setExtStorage", func(val failpoint.Value) {
|
|
path := val.(string)
|
|
b, err := objstore.ParseBackend(path, nil)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
s, err := objstore.New(context.Background(), b, &storeapi.Options{})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
conf.ExtStorage = s
|
|
})
|
|
|
|
tctx, cancelFn := tcontext.Background().WithContext(ctx).WithCancel()
|
|
d := &Dumper{
|
|
tctx: tctx,
|
|
conf: conf,
|
|
cancelCtx: cancelFn,
|
|
selectTiDBTableRegionFunc: selectTiDBTableRegion,
|
|
speedRecorder: NewSpeedRecorder(),
|
|
}
|
|
|
|
var err error
|
|
|
|
d.metrics = newMetrics(conf.PromFactory, conf.Labels)
|
|
d.metrics.registerTo(conf.PromRegistry)
|
|
defer func() {
|
|
if err != nil {
|
|
d.metrics.unregisterFrom(conf.PromRegistry)
|
|
}
|
|
}()
|
|
|
|
err = adjustConfig(conf,
|
|
buildTLSConfig,
|
|
validateSpecifiedSQL,
|
|
adjustFileFormat)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
failpoint.Inject("SetIOTotalBytes", func(_ failpoint.Value) {
|
|
d.conf.IOTotalBytes = gatomic.NewUint64(0)
|
|
d.conf.Net = uuid.New().String()
|
|
go func() {
|
|
for {
|
|
time.Sleep(10 * time.Millisecond)
|
|
d.tctx.L().Logger.Info("IOTotalBytes", zap.Uint64("IOTotalBytes", d.conf.IOTotalBytes.Load()))
|
|
}
|
|
}()
|
|
})
|
|
|
|
err = runSteps(d,
|
|
initLogger,
|
|
createExternalStore,
|
|
startHTTPService,
|
|
openSQLDB,
|
|
detectServerInfo,
|
|
resolveAutoConsistency,
|
|
|
|
validateResolveAutoConsistency,
|
|
tidbSetPDClientForGC,
|
|
tidbGetSnapshot,
|
|
tidbStartGCSavepointUpdateService,
|
|
|
|
setSessionParam)
|
|
return d, err
|
|
}
|
|
|
|
// Dump dumps table from database
|
|
// nolint: gocyclo
|
|
func (d *Dumper) Dump() (dumpErr error) {
|
|
initColTypeRowReceiverMap()
|
|
var (
|
|
conn *sql.Conn
|
|
err error
|
|
conCtrl ConsistencyController
|
|
)
|
|
tctx, conf, pool := d.tctx, d.conf, d.dbHandle
|
|
tctx.L().Info("begin to run Dump", zap.Stringer("conf", conf))
|
|
m := newGlobalMetadata(tctx, d.extStore, conf.Snapshot)
|
|
repeatableRead := needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency)
|
|
defer func() {
|
|
if dumpErr == nil {
|
|
_ = m.writeGlobalMetaData()
|
|
}
|
|
}()
|
|
|
|
// for consistency lock, we should get table list at first to generate the lock tables SQL
|
|
if conf.Consistency == ConsistencyTypeLock {
|
|
conn, err = createConnWithConsistency(tctx, pool, repeatableRead)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if err = prepareTableListToDump(tctx, conf, conn); err != nil {
|
|
_ = conn.Close()
|
|
return err
|
|
}
|
|
_ = conn.Close()
|
|
}
|
|
|
|
conCtrl, err = NewConsistencyController(tctx, conf, pool)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = conCtrl.Setup(tctx); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// To avoid lock is not released
|
|
defer func() {
|
|
err = conCtrl.TearDown(tctx)
|
|
if err != nil {
|
|
tctx.L().Warn("fail to tear down consistency controller", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
metaConn, err := createConnWithConsistency(tctx, pool, repeatableRead)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
_ = metaConn.Close()
|
|
}()
|
|
m.recordStartTime(time.Now())
|
|
// for consistency lock, we can write snapshot info after all tables are locked.
|
|
// the binlog pos may changed because there is still possible write between we lock tables and write master status.
|
|
// but for the locked tables doing replication that starts from metadata is safe.
|
|
// for consistency flush, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
|
|
// for consistency snapshot, we should use the snapshot that we get/set at first in metadata. TiDB will assure the snapshot of TSO.
|
|
// for consistency none, the binlog pos in metadata might be earlier than dumped data. We need to enable safe-mode to assure data safety.
|
|
err = m.recordGlobalMetaData(metaConn, conf.ServerInfo, false)
|
|
if err != nil {
|
|
tctx.L().Warn("get global metadata failed", log.ShortError(err))
|
|
}
|
|
|
|
if d.conf.CollationCompatible == StrictCollationCompatible {
|
|
//init charset and default collation map
|
|
d.charsetAndDefaultCollationMap, err = GetCharsetAndDefaultCollation(tctx.Context, metaConn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
|
|
if conf.Consistency != ConsistencyTypeLock {
|
|
if err = prepareTableListToDump(tctx, conf, metaConn); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err = d.renewSelectTableRegionFuncForLowerTiDB(tctx); err != nil {
|
|
tctx.L().Info("cannot update select table region info for TiDB", log.ShortError(err))
|
|
}
|
|
|
|
atomic.StoreInt64(&d.totalTables, int64(calculateTableCount(conf.Tables)))
|
|
|
|
rebuildMetaConn := func(conn *sql.Conn, updateMeta bool) (*sql.Conn, error) {
|
|
_ = conn.Raw(func(any) error {
|
|
// return an `ErrBadConn` to ensure close the connection, but do not put it back to the pool.
|
|
// if we choose to use `Close`, it will always put the connection back to the pool.
|
|
return driver.ErrBadConn
|
|
})
|
|
|
|
newConn, err1 := createConnWithConsistency(tctx, pool, repeatableRead)
|
|
if err1 != nil {
|
|
return conn, errors.Trace(err1)
|
|
}
|
|
conn = newConn
|
|
// renew the master status after connection. dm can't close safe-mode until dm reaches current pos
|
|
if updateMeta && conf.PosAfterConnect {
|
|
err1 = m.recordGlobalMetaData(conn, conf.ServerInfo, true)
|
|
if err1 != nil {
|
|
return conn, errors.Trace(err1)
|
|
}
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
rebuildConn := func(conn *sql.Conn, updateMeta bool) (*sql.Conn, error) {
|
|
// make sure that the lock connection is still alive
|
|
err1 := conCtrl.PingContext(tctx)
|
|
if err1 != nil {
|
|
return conn, errors.Trace(err1)
|
|
}
|
|
return rebuildMetaConn(conn, updateMeta)
|
|
}
|
|
|
|
chanSize := defaultTaskChannelCapacity
|
|
failpoint.Inject("SmallDumpChanSize", func() {
|
|
chanSize = 1
|
|
})
|
|
taskIn, taskOut := infiniteChan[Task]()
|
|
// todo: refine metrics
|
|
AddGauge(d.metrics.taskChannelCapacity, float64(chanSize))
|
|
wg, writingCtx := errgroup.WithContext(tctx)
|
|
writerCtx := tctx.WithContext(writingCtx)
|
|
writers, tearDownWriters, err := d.startWriters(writerCtx, wg, taskOut, rebuildConn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tearDownWriters()
|
|
|
|
if conf.TransactionalConsistency {
|
|
if conf.Consistency == ConsistencyTypeFlush || conf.Consistency == ConsistencyTypeLock {
|
|
tctx.L().Info("All the dumping transactions have started. Start to unlock tables")
|
|
}
|
|
if err = conCtrl.TearDown(tctx); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
// Inject consistency failpoint test after we release the table lock
|
|
failpoint.Inject("ConsistencyCheck", nil)
|
|
|
|
if conf.PosAfterConnect {
|
|
// record again, to provide a location to exit safe mode for DM
|
|
err = m.recordGlobalMetaData(metaConn, conf.ServerInfo, true)
|
|
if err != nil {
|
|
tctx.L().Info("get global metadata (after connection pool established) failed", log.ShortError(err))
|
|
}
|
|
}
|
|
|
|
summary.SetLogCollector(summary.NewLogCollector(tctx.L().Info))
|
|
summary.SetUnit(summary.BackupUnit)
|
|
defer summary.Summary(summary.BackupUnit)
|
|
|
|
logProgressCtx, logProgressCancel := tctx.WithCancel()
|
|
go d.runLogProgress(logProgressCtx)
|
|
defer logProgressCancel()
|
|
|
|
tableDataStartTime := time.Now()
|
|
|
|
failpoint.Inject("PrintTiDBMemQuotaQuery", func(_ failpoint.Value) {
|
|
row := d.dbHandle.QueryRowContext(tctx, "select @@tidb_mem_quota_query;")
|
|
var s string
|
|
err = row.Scan(&s)
|
|
if err != nil {
|
|
fmt.Println(errors.Trace(err))
|
|
} else {
|
|
fmt.Printf("tidb_mem_quota_query == %s\n", s)
|
|
}
|
|
})
|
|
baseConn := newBaseConn(metaConn, true, rebuildMetaConn)
|
|
|
|
if conf.SQL == "" {
|
|
if err = d.dumpDatabases(writerCtx, baseConn, taskIn); err != nil && !errors.ErrorEqual(err, context.Canceled) {
|
|
return err
|
|
}
|
|
} else {
|
|
d.dumpSQL(writerCtx, baseConn, taskIn)
|
|
}
|
|
d.metrics.progressReady.Store(true)
|
|
close(taskIn)
|
|
failpoint.Inject("EnableLogProgress", func() {
|
|
time.Sleep(1 * time.Second)
|
|
tctx.L().Debug("progress ready, sleep 1s")
|
|
})
|
|
_ = baseConn.DBConn.Close()
|
|
if err := wg.Wait(); err != nil {
|
|
summary.CollectFailureUnit("dump table data", err)
|
|
return errors.Trace(err)
|
|
}
|
|
summary.CollectSuccessUnit("dump cost", countTotalTask(writers), time.Since(tableDataStartTime))
|
|
|
|
summary.SetSuccessStatus(true)
|
|
m.recordFinishTime(time.Now())
|
|
return nil
|
|
}
|
|
|
|
func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskChan <-chan Task,
|
|
rebuildConnFn func(*sql.Conn, bool) (*sql.Conn, error)) ([]*Writer, func(), error) {
|
|
conf, pool := d.conf, d.dbHandle
|
|
writers := make([]*Writer, conf.Threads)
|
|
for i := range conf.Threads {
|
|
conn, err := createConnWithConsistency(tctx, pool, needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency))
|
|
if err != nil {
|
|
return nil, func() {}, err
|
|
}
|
|
writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics)
|
|
writer.rebuildConnFn = rebuildConnFn
|
|
writer.setFinishTableCallBack(func(task Task) {
|
|
if _, ok := task.(*TaskTableData); ok {
|
|
IncCounter(d.metrics.finishedTablesCounter)
|
|
// FIXME: actually finishing the last chunk doesn't means this table is 'finished'.
|
|
// We can call this table is 'finished' if all its chunks are finished.
|
|
// Comment this log now to avoid ambiguity.
|
|
// tctx.L().Debug("finished dumping table data",
|
|
// zap.String("database", td.Meta.DatabaseName()),
|
|
// zap.String("table", td.Meta.TableName()))
|
|
failpoint.Inject("EnableLogProgress", func() {
|
|
time.Sleep(1 * time.Second)
|
|
tctx.L().Debug("EnableLogProgress, sleep 1s")
|
|
})
|
|
}
|
|
})
|
|
writer.setFinishTaskCallBack(func(task Task) {
|
|
IncGauge(d.metrics.taskChannelCapacity)
|
|
if td, ok := task.(*TaskTableData); ok {
|
|
d.metrics.completedChunks.Add(1)
|
|
tctx.L().Debug("finish dumping table data task",
|
|
zap.String("database", td.Meta.DatabaseName()),
|
|
zap.String("table", td.Meta.TableName()),
|
|
zap.Int("chunkIdx", td.ChunkIndex))
|
|
}
|
|
})
|
|
wg.Go(func() error {
|
|
return writer.run(taskChan)
|
|
})
|
|
writers[i] = writer
|
|
}
|
|
tearDown := func() {
|
|
for _, w := range writers {
|
|
_ = w.conn.Close()
|
|
}
|
|
}
|
|
return writers, tearDown, nil
|
|
}
|
|
|
|
func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *BaseConn, taskChan chan<- Task) error {
|
|
conf := d.conf
|
|
allTables := conf.Tables
|
|
|
|
// policy should be created before database
|
|
// placement policy in other server type can be different, so we only handle the tidb server
|
|
if conf.ServerInfo.ServerType == version.ServerTypeTiDB {
|
|
policyNames, err := ListAllPlacementPolicyNames(tctx, metaConn)
|
|
if err != nil {
|
|
errCause := errors.Cause(err)
|
|
if mysqlErr, ok := errCause.(*mysql.MySQLError); ok && mysqlErr.Number == ErrNoSuchTable {
|
|
// some old tidb version and other server type doesn't support placement rules, we can skip it.
|
|
tctx.L().Debug("cannot dump placement policy, maybe the server doesn't support it", log.ShortError(err))
|
|
} else {
|
|
tctx.L().Warn("fail to dump placement policy: ", log.ShortError(err))
|
|
}
|
|
}
|
|
for _, policy := range policyNames {
|
|
createPolicySQL, err := ShowCreatePlacementPolicy(tctx, metaConn, policy)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
wrappedCreatePolicySQL := fmt.Sprintf("/*T![placement] %s */", createPolicySQL)
|
|
task := NewTaskPolicyMeta(policy, wrappedCreatePolicySQL)
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
parser1 := parser.New()
|
|
for dbName, tables := range allTables {
|
|
if !conf.NoSchemas {
|
|
createDatabaseSQL, err := ShowCreateDatabase(tctx, metaConn, dbName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// adjust db collation
|
|
createDatabaseSQL, err = adjustDatabaseCollation(tctx, d.conf.CollationCompatible, parser1, createDatabaseSQL, d.charsetAndDefaultCollationMap)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
task := NewTaskDatabaseMeta(dbName, createDatabaseSQL)
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
}
|
|
|
|
for _, table := range tables {
|
|
tctx.L().Debug("start dumping table...", zap.String("database", dbName),
|
|
zap.String("table", table.Name))
|
|
meta, err := dumpTableMeta(tctx, conf, metaConn, dbName, table)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
if !conf.NoSchemas {
|
|
switch table.Type {
|
|
case TableTypeView:
|
|
task := NewTaskViewMeta(dbName, table.Name, meta.ShowCreateTable(), meta.ShowCreateView())
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
case TableTypeSequence:
|
|
task := NewTaskSequenceMeta(dbName, table.Name, meta.ShowCreateTable())
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
default:
|
|
// adjust table collation
|
|
newCreateSQL, err := adjustTableCollation(tctx, d.conf.CollationCompatible, parser1, meta.ShowCreateTable(), d.charsetAndDefaultCollationMap)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
meta.(*tableMeta).showCreateTable = newCreateSQL
|
|
|
|
task := NewTaskTableMeta(dbName, table.Name, meta.ShowCreateTable())
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
}
|
|
}
|
|
if table.Type == TableTypeBase {
|
|
err = d.dumpTableData(tctx, metaConn, meta, taskChan)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// adjustDatabaseCollation adjusts db collation and return new create sql and collation
|
|
func adjustDatabaseCollation(tctx *tcontext.Context, collationCompatible string, parser *parser.Parser, originSQL string, charsetAndDefaultCollationMap map[string]string) (string, error) {
|
|
if collationCompatible != StrictCollationCompatible {
|
|
return originSQL, nil
|
|
}
|
|
stmt, err := parser.ParseOneStmt(originSQL, "", "")
|
|
if err != nil {
|
|
tctx.L().Warn("parse create database error, maybe tidb parser doesn't support it", zap.String("originSQL", originSQL), log.ShortError(err))
|
|
return originSQL, nil
|
|
}
|
|
createStmt, ok := stmt.(*ast.CreateDatabaseStmt)
|
|
if !ok {
|
|
return originSQL, nil
|
|
}
|
|
var charset string
|
|
for _, createOption := range createStmt.Options {
|
|
// already have 'Collation'
|
|
if createOption.Tp == ast.DatabaseOptionCollate {
|
|
return originSQL, nil
|
|
}
|
|
if createOption.Tp == ast.DatabaseOptionCharset {
|
|
charset = createOption.Value
|
|
}
|
|
}
|
|
// get db collation
|
|
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(charset)]
|
|
if !ok {
|
|
tctx.L().Warn("not found database charset default collation.", zap.String("originSQL", originSQL), zap.String("charset", strings.ToLower(charset)))
|
|
return originSQL, nil
|
|
}
|
|
// add collation
|
|
createStmt.Options = append(createStmt.Options, &ast.DatabaseOption{Tp: ast.DatabaseOptionCollate, Value: collation})
|
|
// rewrite sql
|
|
var b []byte
|
|
bf := bytes.NewBuffer(b)
|
|
err = createStmt.Restore(&format.RestoreCtx{
|
|
Flags: format.DefaultRestoreFlags | format.RestoreTiDBSpecialComment,
|
|
In: bf,
|
|
})
|
|
if err != nil {
|
|
return "", errors.Trace(err)
|
|
}
|
|
return bf.String(), nil
|
|
}
|
|
|
|
// adjustTableCollation adjusts table collation
|
|
func adjustTableCollation(tctx *tcontext.Context, collationCompatible string, parser *parser.Parser, originSQL string, charsetAndDefaultCollationMap map[string]string) (string, error) {
|
|
if collationCompatible != StrictCollationCompatible {
|
|
return originSQL, nil
|
|
}
|
|
stmt, err := parser.ParseOneStmt(originSQL, "", "")
|
|
if err != nil {
|
|
tctx.L().Warn("parse create table error, maybe tidb parser doesn't support it", zap.String("originSQL", originSQL), log.ShortError(err))
|
|
return originSQL, nil
|
|
}
|
|
createStmt, ok := stmt.(*ast.CreateTableStmt)
|
|
if !ok {
|
|
return originSQL, nil
|
|
}
|
|
var charset string
|
|
var collation string
|
|
for _, createOption := range createStmt.Options {
|
|
// already have 'Collation'
|
|
if createOption.Tp == ast.TableOptionCollate {
|
|
collation = createOption.StrValue
|
|
break
|
|
}
|
|
if createOption.Tp == ast.TableOptionCharset {
|
|
charset = createOption.StrValue
|
|
}
|
|
}
|
|
|
|
if collation == "" && charset != "" {
|
|
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(charset)]
|
|
if !ok {
|
|
tctx.L().Warn("not found table charset default collation.", zap.String("originSQL", originSQL), zap.String("charset", strings.ToLower(charset)))
|
|
return originSQL, nil
|
|
}
|
|
|
|
// add collation
|
|
createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation})
|
|
}
|
|
|
|
// adjust columns collation
|
|
adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap)
|
|
|
|
// rewrite sql
|
|
var b []byte
|
|
bf := bytes.NewBuffer(b)
|
|
err = createStmt.Restore(&format.RestoreCtx{
|
|
Flags: format.DefaultRestoreFlags | format.RestoreTiDBSpecialComment,
|
|
In: bf,
|
|
})
|
|
if err != nil {
|
|
return "", errors.Trace(err)
|
|
}
|
|
return bf.String(), nil
|
|
}
|
|
|
|
// adjustColumnsCollation adds column's collation.
|
|
func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableStmt, charsetAndDefaultCollationMap map[string]string) {
|
|
ColumnLoop:
|
|
for _, col := range createStmt.Cols {
|
|
for _, options := range col.Options {
|
|
// already have 'Collation'
|
|
if options.Tp == ast.ColumnOptionCollate {
|
|
continue ColumnLoop
|
|
}
|
|
}
|
|
fieldType := col.Tp
|
|
if fieldType.GetCollate() != "" {
|
|
continue
|
|
}
|
|
if fieldType.GetCharset() != "" {
|
|
// just have charset
|
|
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(fieldType.GetCharset())]
|
|
if !ok {
|
|
tctx.L().Warn("not found charset default collation for column.", zap.String("table", createStmt.Table.Name.String()), zap.String("column", col.Name.String()), zap.String("charset", strings.ToLower(fieldType.GetCharset())))
|
|
continue
|
|
}
|
|
fieldType.SetCollate(collation)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error {
|
|
conf := d.conf
|
|
if conf.NoData {
|
|
return nil
|
|
}
|
|
|
|
// Update total rows
|
|
fieldName, _ := pickupPossibleField(tctx, meta, conn)
|
|
c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf)
|
|
AddCounter(d.metrics.estimateTotalRowsCounter, float64(c))
|
|
|
|
if conf.Rows == UnspecifiedSize {
|
|
return d.sequentialDumpTable(tctx, conn, meta, taskChan)
|
|
}
|
|
return d.concurrentDumpTable(tctx, conn, meta, taskChan)
|
|
}
|
|
|
|
func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (*TaskTableData, error) {
|
|
tableChan := make(chan Task, 128)
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
// adjust rows to suitable rows for this table
|
|
d.conf.Rows = GetSuitableRows(meta.AvgRowLength())
|
|
err := d.concurrentDumpTable(tctx, conn, meta, tableChan)
|
|
d.conf.Rows = UnspecifiedSize
|
|
if err != nil {
|
|
errCh <- err
|
|
} else {
|
|
close(errCh)
|
|
}
|
|
}()
|
|
tableDataArr := make([]*tableData, 0)
|
|
handleSubTask := func(task Task) {
|
|
tableTask, ok := task.(*TaskTableData)
|
|
if !ok {
|
|
tctx.L().Warn("unexpected task when splitting table chunks", zap.String("task", tableTask.Brief()))
|
|
return
|
|
}
|
|
tableDataInst, ok := tableTask.Data.(*tableData)
|
|
if !ok {
|
|
tctx.L().Warn("unexpected task.Data when splitting table chunks", zap.String("task", tableTask.Brief()))
|
|
return
|
|
}
|
|
tableDataArr = append(tableDataArr, tableDataInst)
|
|
d.metrics.totalChunks.Dec()
|
|
}
|
|
for {
|
|
select {
|
|
case err, ok := <-errCh:
|
|
if !ok {
|
|
// make sure all the subtasks in tableChan are handled
|
|
for len(tableChan) > 0 {
|
|
task := <-tableChan
|
|
handleSubTask(task)
|
|
}
|
|
if len(tableDataArr) <= 1 {
|
|
return nil, nil
|
|
}
|
|
queries := make([]string, 0, len(tableDataArr))
|
|
colLen := tableDataArr[0].colLen
|
|
for _, tableDataInst := range tableDataArr {
|
|
queries = append(queries, tableDataInst.query)
|
|
if colLen != tableDataInst.colLen {
|
|
tctx.L().Warn("colLen varies for same table",
|
|
zap.Int("oldColLen", colLen),
|
|
zap.String("oldQuery", queries[0]),
|
|
zap.Int("newColLen", tableDataInst.colLen),
|
|
zap.String("newQuery", tableDataInst.query))
|
|
return nil, nil
|
|
}
|
|
}
|
|
return d.newTaskTableData(meta, newMultiQueriesChunk(queries, colLen), 0, 1), nil
|
|
}
|
|
return nil, err
|
|
case task := <-tableChan:
|
|
handleSubTask(task)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, meta TableMeta, taskChan chan<- Task, partition, orderByClause string, currentChunk, totalChunks int) error {
|
|
conf := d.conf
|
|
tableIR := SelectAllFromTable(conf, meta, partition, orderByClause)
|
|
task := d.newTaskTableData(meta, tableIR, currentChunk, totalChunks)
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error {
|
|
conf := d.conf
|
|
if conf.ServerInfo.ServerType == version.ServerTypeTiDB {
|
|
task, err := d.buildConcatTask(tctx, conn, meta)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if task != nil {
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
return nil
|
|
}
|
|
tctx.L().Info("didn't build tidb concat sqls, will select all from table now",
|
|
zap.String("database", meta.DatabaseName()),
|
|
zap.String("table", meta.TableName()))
|
|
}
|
|
orderByClause, err := buildOrderByClause(tctx, conf, conn, meta.DatabaseName(), meta.TableName(), meta.HasImplicitRowID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
|
|
}
|
|
|
|
// concurrentDumpTable tries to split table into several chunks to dump
|
|
func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error {
|
|
conf := d.conf
|
|
db, tbl := meta.DatabaseName(), meta.TableName()
|
|
if conf.ServerInfo.ServerType == version.ServerTypeTiDB &&
|
|
conf.ServerInfo.ServerVersion != nil &&
|
|
(conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 ||
|
|
(conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*decodeRegionVersion) >= 0)) {
|
|
err := d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan)
|
|
// don't retry on context error and successful tasks
|
|
if err2 := errors.Cause(err); err2 == nil || err2 == context.DeadlineExceeded || err2 == context.Canceled {
|
|
return err
|
|
} else if err2 != errEmptyHandleVals {
|
|
tctx.L().Info("fallback to concurrent dump tables using rows due to some problem. This won't influence the whole dump process",
|
|
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
|
|
}
|
|
}
|
|
|
|
orderByClause, err := buildOrderByClause(tctx, conf, conn, db, tbl, meta.HasImplicitRowID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
field, err := pickupPossibleField(tctx, meta, conn)
|
|
if err != nil || field == "" {
|
|
// skip split chunk logic if not found proper field
|
|
tctx.L().Info("fallback to sequential dump due to no proper field. This won't influence the whole dump process",
|
|
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
|
|
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
|
|
}
|
|
|
|
count := estimateCount(d.tctx, db, tbl, conn, field, conf)
|
|
tctx.L().Info("get estimated rows count",
|
|
zap.String("database", db),
|
|
zap.String("table", tbl),
|
|
zap.Uint64("estimateCount", count))
|
|
if count < conf.Rows {
|
|
// skip chunk logic if estimates are low
|
|
tctx.L().Info("fallback to sequential dump due to estimate count < rows. This won't influence the whole dump process",
|
|
zap.Uint64("estimate count", count),
|
|
zap.Uint64("conf.rows", conf.Rows),
|
|
zap.String("database", db),
|
|
zap.String("table", tbl))
|
|
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
|
|
}
|
|
|
|
minv, maxv, err := d.selectMinAndMaxIntValue(tctx, conn, db, tbl, field)
|
|
if err != nil {
|
|
tctx.L().Info("fallback to sequential dump due to cannot get bounding values. This won't influence the whole dump process",
|
|
log.ShortError(err))
|
|
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
|
|
}
|
|
tctx.L().Debug("get int bounding values",
|
|
zap.String("lower", minv.String()),
|
|
zap.String("upper", maxv.String()))
|
|
|
|
// every chunk would have eventual adjustments
|
|
estimatedChunks := count / conf.Rows
|
|
estimatedStep := new(big.Int).Sub(maxv, minv).Uint64()/estimatedChunks + 1
|
|
bigEstimatedStep := new(big.Int).SetUint64(estimatedStep)
|
|
cutoff := new(big.Int).Set(minv)
|
|
totalChunks := estimatedChunks
|
|
if estimatedStep == 1 {
|
|
totalChunks = new(big.Int).Sub(maxv, minv).Uint64() + 1
|
|
}
|
|
|
|
selectField, selectLen := meta.SelectedField(), meta.SelectedLen()
|
|
|
|
chunkIndex := 0
|
|
nullValueCondition := ""
|
|
if conf.Where == "" {
|
|
nullValueCondition = fmt.Sprintf("`%s` IS NULL OR ", escapeString(field))
|
|
}
|
|
for maxv.Cmp(cutoff) >= 0 {
|
|
nextCutOff := new(big.Int).Add(cutoff, bigEstimatedStep)
|
|
where := fmt.Sprintf("%s(`%s` >= %d AND `%s` < %d)", nullValueCondition, escapeString(field), cutoff, escapeString(field), nextCutOff)
|
|
query := buildSelectQuery(db, tbl, selectField, "", buildWhereCondition(conf, where), orderByClause)
|
|
if len(nullValueCondition) > 0 {
|
|
nullValueCondition = ""
|
|
}
|
|
task := d.newTaskTableData(meta, newTableData(query, selectLen, false), chunkIndex, int(totalChunks))
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
cutoff = nextCutOff
|
|
chunkIndex++
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan<- Task) (ctxDone bool) {
|
|
select {
|
|
case <-tctx.Done():
|
|
return true
|
|
case taskChan <- task:
|
|
tctx.L().Debug("send task to writer",
|
|
zap.String("task", task.Brief()))
|
|
DecGauge(d.metrics.taskChannelCapacity)
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (d *Dumper) selectMinAndMaxIntValue(tctx *tcontext.Context, conn *BaseConn, db, tbl, field string) (minv, maxv *big.Int, err error) {
|
|
conf, zero := d.conf, &big.Int{}
|
|
query := fmt.Sprintf("SELECT MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`",
|
|
escapeString(field), escapeString(field), escapeString(db), escapeString(tbl))
|
|
if conf.Where != "" {
|
|
query = fmt.Sprintf("%s WHERE %s", query, conf.Where)
|
|
}
|
|
tctx.L().Debug("split chunks", zap.String("query", query))
|
|
|
|
var smin sql.NullString
|
|
var smax sql.NullString
|
|
err = conn.QuerySQL(tctx, func(rows *sql.Rows) error {
|
|
err := rows.Scan(&smin, &smax)
|
|
rows.Close()
|
|
return err
|
|
}, func() {}, query)
|
|
if err != nil {
|
|
return zero, zero, errors.Annotatef(err, "can't get min/max values to split chunks, query: %s", query)
|
|
}
|
|
if !smax.Valid || !smin.Valid {
|
|
// found no data
|
|
return zero, zero, errors.Errorf("no invalid min/max value found in query %s", query)
|
|
}
|
|
|
|
maxv = new(big.Int)
|
|
minv = new(big.Int)
|
|
var ok bool
|
|
if maxv, ok = maxv.SetString(smax.String, 10); !ok {
|
|
return zero, zero, errors.Errorf("fail to convert max value %s in query %s", smax.String, query)
|
|
}
|
|
if minv, ok = minv.SetString(smin.String, 10); !ok {
|
|
return zero, zero, errors.Errorf("fail to convert min value %s in query %s", smin.String, query)
|
|
}
|
|
return minv, maxv, nil
|
|
}
|
|
|
|
func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error {
|
|
db, tbl := meta.DatabaseName(), meta.TableName()
|
|
|
|
var (
|
|
handleColNames []string
|
|
handleVals [][]string
|
|
err error
|
|
)
|
|
// for TiDB v5.0+, we can use table sample directly
|
|
if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 {
|
|
tctx.L().Debug("dumping TiDB tables with TABLESAMPLE",
|
|
zap.String("database", db), zap.String("table", tbl))
|
|
handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, meta)
|
|
} else {
|
|
// for TiDB v3.0+, we can use table region decode in TiDB directly
|
|
tctx.L().Debug("dumping TiDB tables with TABLE REGIONS",
|
|
zap.String("database", db), zap.String("table", tbl))
|
|
var partitions []string
|
|
if d.conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 {
|
|
partitions, err = GetPartitionNames(tctx, conn, db, tbl)
|
|
}
|
|
if err == nil {
|
|
if len(partitions) != 0 {
|
|
return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions)
|
|
}
|
|
handleColNames, handleVals, err = d.selectTiDBTableRegionFunc(tctx, conn, meta)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1)
|
|
}
|
|
|
|
func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task, partitions []string) error {
|
|
db, tbl := meta.DatabaseName(), meta.TableName()
|
|
tctx.L().Debug("dumping TiDB tables with TABLE REGIONS for partition table",
|
|
zap.String("database", db), zap.String("table", tbl), zap.Strings("partitions", partitions))
|
|
|
|
startChunkIdx := 0
|
|
totalChunk := 0
|
|
cachedHandleVals := make([][][]string, len(partitions))
|
|
|
|
handleColNames, _, err := selectTiDBRowKeyFields(tctx, conn, meta, checkTiDBTableRegionPkFields)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// cache handleVals here to calculate the total chunks
|
|
for i, partition := range partitions {
|
|
handleVals, err := selectTiDBPartitionRegion(tctx, conn, db, tbl, partition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
totalChunk += len(handleVals) + 1
|
|
cachedHandleVals[i] = handleVals
|
|
}
|
|
for i, partition := range partitions {
|
|
err := d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
startChunkIdx += len(cachedHandleVals[i]) + 1
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context,
|
|
meta TableMeta, taskChan chan<- Task,
|
|
handleColNames []string, handleVals [][]string, partition string,
|
|
startChunkIdx, totalChunk int) error {
|
|
db, tbl := meta.DatabaseName(), meta.TableName()
|
|
if len(handleVals) == 0 {
|
|
if partition == "" {
|
|
// return error to make outside function try using rows method to dump data
|
|
return errors.Annotatef(errEmptyHandleVals, "table: `%s`.`%s`", escapeString(db), escapeString(tbl))
|
|
}
|
|
return d.dumpWholeTableDirectly(tctx, meta, taskChan, partition, buildOrderByClauseString(handleColNames), startChunkIdx, totalChunk)
|
|
}
|
|
conf := d.conf
|
|
selectField, selectLen := meta.SelectedField(), meta.SelectedLen()
|
|
where := buildWhereClauses(handleColNames, handleVals)
|
|
orderByClause := buildOrderByClauseString(handleColNames)
|
|
|
|
for i, w := range where {
|
|
query := buildSelectQuery(db, tbl, selectField, partition, buildWhereCondition(conf, w), orderByClause)
|
|
task := d.newTaskTableData(meta, newTableData(query, selectLen, false), i+startChunkIdx, totalChunk)
|
|
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
|
|
if ctxDone {
|
|
return tctx.Err()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// L returns real logger
|
|
func (d *Dumper) L() log.Logger {
|
|
return d.tctx.L()
|
|
}
|
|
|
|
func selectTiDBTableSample(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) {
|
|
pkFields, pkColTypes, err := selectTiDBRowKeyFields(tctx, conn, meta, nil)
|
|
if err != nil {
|
|
return nil, nil, errors.Trace(err)
|
|
}
|
|
|
|
query := buildTiDBTableSampleQuery(pkFields, meta.DatabaseName(), meta.TableName())
|
|
pkValNum := len(pkFields)
|
|
var iter SQLRowIter
|
|
rowRec := MakeRowReceiver(pkColTypes)
|
|
buf := new(bytes.Buffer)
|
|
|
|
err = conn.QuerySQL(tctx, func(rows *sql.Rows) error {
|
|
if iter == nil {
|
|
iter = &rowIter{
|
|
rows: rows,
|
|
args: make([]any, pkValNum),
|
|
}
|
|
}
|
|
err = iter.Decode(rowRec)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
pkValRow := make([]string, 0, pkValNum)
|
|
for _, rec := range rowRec.receivers {
|
|
rec.WriteToBuffer(buf, true)
|
|
pkValRow = append(pkValRow, buf.String())
|
|
buf.Reset()
|
|
}
|
|
pkVals = append(pkVals, pkValRow)
|
|
return nil
|
|
}, func() {
|
|
if iter != nil {
|
|
_ = iter.Close()
|
|
iter = nil
|
|
}
|
|
rowRec = MakeRowReceiver(pkColTypes)
|
|
pkVals = pkVals[:0]
|
|
buf.Reset()
|
|
}, query)
|
|
if err == nil && iter != nil && iter.Error() != nil {
|
|
err = iter.Error()
|
|
}
|
|
|
|
return pkFields, pkVals, err
|
|
}
|
|
|
|
func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string {
|
|
template := "SELECT %s FROM `%s`.`%s` TABLESAMPLE REGIONS() ORDER BY %s"
|
|
quotaPk := make([]string, len(pkFields))
|
|
for i, s := range pkFields {
|
|
quotaPk[i] = fmt.Sprintf("`%s`", escapeString(s))
|
|
}
|
|
pks := strings.Join(quotaPk, ",")
|
|
return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks)
|
|
}
|
|
|
|
func selectTiDBRowKeyFields(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) {
|
|
if meta.HasImplicitRowID() {
|
|
pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"}
|
|
} else {
|
|
pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(tctx, conn, meta)
|
|
if err == nil {
|
|
if checkPkFields != nil {
|
|
err = checkPkFields(pkFields, pkColTypes)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
|
|
if len(pkFields) != 1 || len(pkColTypes) != 1 {
|
|
err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
|
|
return
|
|
}
|
|
if _, ok := dataTypeInt[pkColTypes[0]]; !ok {
|
|
err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
|
|
}
|
|
return
|
|
}
|
|
|
|
func selectTiDBTableRegion(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) {
|
|
pkFields, _, err = selectTiDBRowKeyFields(tctx, conn, meta, checkTiDBTableRegionPkFields)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var (
|
|
startKey, decodedKey sql.NullString
|
|
rowID = -1
|
|
)
|
|
const (
|
|
tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;"
|
|
tidbRowID = "_tidb_rowid="
|
|
)
|
|
dbName, tableName := meta.DatabaseName(), meta.TableName()
|
|
logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName))
|
|
err = conn.QuerySQL(tctx, func(rows *sql.Rows) error {
|
|
rowID++
|
|
err = rows.Scan(&startKey, &decodedKey)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// first region's start key has no use. It may come from another table or might be invalid
|
|
if rowID == 0 {
|
|
return nil
|
|
}
|
|
if !startKey.Valid {
|
|
logger.Debug("meet invalid start key", zap.Int("rowID", rowID))
|
|
return nil
|
|
}
|
|
if !decodedKey.Valid {
|
|
logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String))
|
|
return nil
|
|
}
|
|
pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String)
|
|
if err2 != nil {
|
|
logger.Debug("cannot extract pkVal from decoded start key",
|
|
zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), log.ShortError(err2))
|
|
} else {
|
|
pkVals = append(pkVals, []string{pkVal})
|
|
}
|
|
return nil
|
|
}, func() {
|
|
pkFields = pkFields[:0]
|
|
pkVals = pkVals[:0]
|
|
}, tableRegionSQL, dbName, tableName)
|
|
|
|
return pkFields, pkVals, errors.Trace(err)
|
|
}
|
|
|
|
func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *BaseConn, dbName, tableName, partition string) (pkVals [][]string, err error) {
|
|
var startKeys [][]string
|
|
const (
|
|
partitionRegionSQL = "SHOW TABLE `%s`.`%s` PARTITION(`%s`) REGIONS"
|
|
regionRowKey = "r_"
|
|
)
|
|
logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition))
|
|
startKeys, err = conn.QuerySQLWithColumns(tctx, []string{"START_KEY"}, fmt.Sprintf(partitionRegionSQL, escapeString(dbName), escapeString(tableName), escapeString(partition)))
|
|
if err != nil {
|
|
return
|
|
}
|
|
for rowID, startKey := range startKeys {
|
|
if rowID == 0 || len(startKey) != 1 {
|
|
continue
|
|
}
|
|
pkVal, err2 := extractTiDBRowIDFromDecodedKey(regionRowKey, startKey[0])
|
|
if err2 != nil {
|
|
logger.Debug("show table region start key doesn't have rowID",
|
|
zap.Int("rowID", rowID), zap.String("startKey", startKey[0]), zap.Error(err2))
|
|
} else {
|
|
pkVals = append(pkVals, []string{pkVal})
|
|
}
|
|
}
|
|
|
|
return pkVals, nil
|
|
}
|
|
|
|
func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) {
|
|
if p := strings.Index(key, indexField); p != -1 {
|
|
p += len(indexField)
|
|
return key[p:], nil
|
|
}
|
|
return "", errors.Errorf("decoded key %s doesn't have %s field", key, indexField)
|
|
}
|
|
|
|
func getListTableTypeByConf(conf *Config) listTableType {
|
|
// use listTableByShowTableStatus by default because it has better performance
|
|
listType := listTableByShowTableStatus
|
|
if conf.Consistency == ConsistencyTypeLock {
|
|
// for consistency lock, we need to build the tables to dump as soon as possible
|
|
listType = listTableByInfoSchema
|
|
} else if conf.Consistency == ConsistencyTypeFlush && matchMysqlBugversion(conf.ServerInfo) {
|
|
// For some buggy versions of mysql, we need a workaround to get a list of table names.
|
|
listType = listTableByShowFullTables
|
|
}
|
|
return listType
|
|
}
|
|
|
|
func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error {
|
|
if conf.SQL != "" {
|
|
return nil
|
|
}
|
|
|
|
var listType listTableType
|
|
|
|
// TiDB has optimized the performance of reading INFORMATION_SCHEMA.TABLES
|
|
if conf.ServerInfo.ServerType == version.ServerTypeTiDB {
|
|
listType = listTableByInfoSchema
|
|
} else {
|
|
ifSeqExists, err := checkIfSeqExists(db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ifSeqExists {
|
|
listType = listTableByShowFullTables
|
|
} else {
|
|
listType = getListTableTypeByConf(conf)
|
|
}
|
|
}
|
|
|
|
if conf.SpecifiedTables {
|
|
return updateSpecifiedTablesMeta(tctx, db, conf.Tables, listType)
|
|
}
|
|
databases, err := prepareDumpingDatabases(tctx, conf, db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tableTypes := []TableType{TableTypeBase}
|
|
if !conf.NoViews {
|
|
tableTypes = append(tableTypes, TableTypeView)
|
|
}
|
|
if !conf.NoSequences {
|
|
tableTypes = append(tableTypes, TableTypeSequence)
|
|
}
|
|
|
|
conf.Tables, err = ListAllDatabasesTables(tctx, db, databases, listType, tableTypes...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
filterTables(tctx, conf)
|
|
return nil
|
|
}
|
|
|
|
func dumpTableMeta(tctx *tcontext.Context, conf *Config, conn *BaseConn, db string, table *TableInfo) (TableMeta, error) {
|
|
tbl := table.Name
|
|
selectField, selectLen, err := buildSelectField(tctx, conn, db, tbl, conf.CompleteInsert)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var (
|
|
colTypes []*sql.ColumnType
|
|
hasImplicitRowID bool
|
|
)
|
|
if conf.ServerInfo.ServerType == version.ServerTypeTiDB {
|
|
hasImplicitRowID, err = SelectTiDBRowID(tctx, conn, db, tbl)
|
|
if err != nil {
|
|
tctx.L().Info("check implicit rowID failed", zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
|
|
}
|
|
}
|
|
|
|
// If all columns are generated
|
|
if table.Type == TableTypeBase {
|
|
if selectField == "" {
|
|
colTypes, err = GetColumnTypes(tctx, conn, "*", db, tbl)
|
|
} else {
|
|
colTypes, err = GetColumnTypes(tctx, conn, selectField, db, tbl)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
meta := &tableMeta{
|
|
avgRowLength: table.AvgRowLength,
|
|
database: db,
|
|
table: tbl,
|
|
colTypes: colTypes,
|
|
selectedField: selectField,
|
|
selectedLen: selectLen,
|
|
hasImplicitRowID: hasImplicitRowID,
|
|
specCmts: getSpecialComments(conf.ServerInfo.ServerType),
|
|
}
|
|
|
|
if conf.NoSchemas {
|
|
return meta, nil
|
|
}
|
|
switch table.Type {
|
|
case TableTypeView:
|
|
viewName := table.Name
|
|
createTableSQL, createViewSQL, err1 := ShowCreateView(tctx, conn, db, viewName)
|
|
if err1 != nil {
|
|
return meta, err1
|
|
}
|
|
meta.showCreateTable = createTableSQL
|
|
meta.showCreateView = createViewSQL
|
|
return meta, nil
|
|
case TableTypeSequence:
|
|
sequenceName := table.Name
|
|
createSequenceSQL, err2 := ShowCreateSequence(tctx, conn, db, sequenceName, conf)
|
|
if err2 != nil {
|
|
return meta, err2
|
|
}
|
|
meta.showCreateTable = createSequenceSQL
|
|
return meta, nil
|
|
}
|
|
|
|
createTableSQL, err := ShowCreateTable(tctx, conn, db, tbl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
meta.showCreateTable = createTableSQL
|
|
return meta, nil
|
|
}
|
|
|
|
func (d *Dumper) dumpSQL(tctx *tcontext.Context, metaConn *BaseConn, taskChan chan<- Task) {
|
|
conf := d.conf
|
|
meta := &tableMeta{}
|
|
data := newTableData(conf.SQL, 0, true)
|
|
task := d.newTaskTableData(meta, data, 0, 1)
|
|
c := detectEstimateRows(tctx, metaConn, fmt.Sprintf("EXPLAIN %s", conf.SQL), []string{"rows", "estRows", "count"})
|
|
AddCounter(d.metrics.estimateTotalRowsCounter, float64(c))
|
|
atomic.StoreInt64(&d.totalTables, int64(1))
|
|
d.sendTaskToChan(tctx, task, taskChan)
|
|
}
|
|
|
|
func canRebuildConn(consistency string, trxConsistencyOnly bool) bool {
|
|
switch consistency {
|
|
case ConsistencyTypeLock, ConsistencyTypeFlush:
|
|
return !trxConsistencyOnly
|
|
case ConsistencyTypeSnapshot, ConsistencyTypeNone:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Close closes a Dumper and stop dumping immediately
|
|
func (d *Dumper) Close() error {
|
|
d.cancelCtx()
|
|
d.metrics.unregisterFrom(d.conf.PromRegistry)
|
|
if d.dbHandle != nil {
|
|
return d.dbHandle.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runSteps(d *Dumper, steps ...func(*Dumper) error) error {
|
|
for _, st := range steps {
|
|
err := st(d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func initLogger(d *Dumper) error {
|
|
conf := d.conf
|
|
var (
|
|
logger log.Logger
|
|
err error
|
|
props *pclog.ZapProperties
|
|
)
|
|
// conf.Logger != nil means dumpling is used as a library
|
|
if conf.Logger != nil {
|
|
logger = log.NewAppLogger(conf.Logger)
|
|
} else {
|
|
logger, props, err = log.InitAppLogger(&log.Config{
|
|
Level: conf.LogLevel,
|
|
File: conf.LogFile,
|
|
Format: conf.LogFormat,
|
|
})
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
pclog.ReplaceGlobals(logger.Logger, props)
|
|
cli.LogLongVersion(logger)
|
|
}
|
|
d.tctx = d.tctx.WithLogger(logger)
|
|
return nil
|
|
}
|
|
|
|
// createExternalStore is an initialization step of Dumper.
|
|
func createExternalStore(d *Dumper) error {
|
|
tctx, conf := d.tctx, d.conf
|
|
extStore, err := conf.createExternalStorage(tctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
d.extStore = extStore
|
|
return nil
|
|
}
|
|
|
|
// startHTTPService is an initialization step of Dumper.
|
|
func startHTTPService(d *Dumper) error {
|
|
conf := d.conf
|
|
if conf.StatusAddr != "" {
|
|
go func() {
|
|
err := startDumplingService(d.tctx, conf.StatusAddr)
|
|
if err != nil {
|
|
d.L().Info("meet error when stopping dumpling http service", log.ShortError(err))
|
|
}
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// openSQLDB is an initialization step of Dumper.
|
|
func openSQLDB(d *Dumper) error {
|
|
if d.conf.IOTotalBytes != nil {
|
|
mysql.RegisterDialContext(d.conf.Net, func(ctx context.Context, addr string) (net.Conn, error) {
|
|
dial := &net.Dialer{}
|
|
conn, err := dial.DialContext(ctx, "tcp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tcpConn := conn.(*net.TCPConn)
|
|
// try https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connector.go#L56-L64
|
|
err = tcpConn.SetKeepAlive(true)
|
|
if err != nil {
|
|
d.tctx.L().Logger.Warn("fail to keep alive", zap.Error(err))
|
|
}
|
|
return util.NewTCPConnWithIOCounter(tcpConn, d.conf.IOTotalBytes), nil
|
|
})
|
|
}
|
|
conf := d.conf
|
|
c, err := mysql.NewConnector(conf.GetDriverConfig(""))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
d.dbHandle = sql.OpenDB(c)
|
|
return nil
|
|
}
|
|
|
|
// detectServerInfo is an initialization step of Dumper.
|
|
func detectServerInfo(d *Dumper) error {
|
|
db, conf := d.dbHandle, d.conf
|
|
versionStr, err := version.FetchVersion(d.tctx.Context, db)
|
|
if err != nil {
|
|
conf.ServerInfo = ServerInfoUnknown
|
|
return err
|
|
}
|
|
conf.ServerInfo = version.ParseServerInfo(versionStr)
|
|
return nil
|
|
}
|
|
|
|
// resolveAutoConsistency is an initialization step of Dumper.
|
|
func resolveAutoConsistency(d *Dumper) error {
|
|
conf := d.conf
|
|
if conf.Consistency != ConsistencyTypeAuto {
|
|
return nil
|
|
}
|
|
switch conf.ServerInfo.ServerType {
|
|
case version.ServerTypeTiDB:
|
|
conf.Consistency = ConsistencyTypeSnapshot
|
|
case version.ServerTypeMySQL, version.ServerTypeMariaDB:
|
|
conf.Consistency = ConsistencyTypeFlush
|
|
default:
|
|
conf.Consistency = ConsistencyTypeNone
|
|
}
|
|
|
|
if conf.Consistency == ConsistencyTypeFlush {
|
|
timeout := time.Second * 5
|
|
ctx, cancel := context.WithTimeout(d.tctx.Context, timeout)
|
|
defer cancel()
|
|
|
|
// probe if upstream has enough privilege to FLUSH TABLE WITH READ LOCK
|
|
conn, err := d.dbHandle.Conn(ctx)
|
|
if err != nil {
|
|
return errors.New("failed to get connection from db pool after 5 seconds")
|
|
}
|
|
//nolint: errcheck
|
|
defer conn.Close()
|
|
|
|
err = FlushTableWithReadLock(d.tctx, conn)
|
|
//nolint: errcheck
|
|
defer UnlockTables(d.tctx, conn)
|
|
if err != nil {
|
|
// fallback to ConsistencyTypeLock
|
|
d.tctx.L().Warn("error when use FLUSH TABLE WITH READ LOCK, fallback to LOCK TABLES",
|
|
zap.Error(err))
|
|
conf.Consistency = ConsistencyTypeLock
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateResolveAutoConsistency(d *Dumper) error {
|
|
conf := d.conf
|
|
if conf.Consistency != ConsistencyTypeSnapshot && conf.Snapshot != "" {
|
|
return errors.Errorf("can't specify --snapshot when --consistency isn't snapshot, resolved consistency: %s", conf.Consistency)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tidbSetPDClientForGC is an initialization step of Dumper.
|
|
func tidbSetPDClientForGC(d *Dumper) error {
|
|
tctx, si, pool := d.tctx, d.conf.ServerInfo, d.dbHandle
|
|
if si.ServerType != version.ServerTypeTiDB ||
|
|
si.ServerVersion == nil ||
|
|
si.ServerVersion.Compare(*gcSafePointVersion) < 0 {
|
|
return nil
|
|
}
|
|
pdAddrs, err := GetPdAddrs(tctx, pool)
|
|
if err != nil {
|
|
tctx.L().Info("meet some problem while fetching pd addrs. This won't affect dump process", log.ShortError(err))
|
|
return nil
|
|
}
|
|
if len(pdAddrs) > 0 {
|
|
doPdGC, err := checkSameCluster(tctx, pool, pdAddrs)
|
|
if err != nil {
|
|
tctx.L().Info("meet error while check whether fetched pd addr and TiDB belong to one cluster. This won't affect dump process", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
|
|
} else if doPdGC {
|
|
pdClient, err := pd.NewClientWithContext(tctx, caller.Component("dumpling-gc"), pdAddrs, pd.SecurityOption{})
|
|
if err != nil {
|
|
tctx.L().Info("create pd client to control GC failed. This won't affect dump process", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
|
|
}
|
|
d.tidbPDClientForGC = pdClient
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tidbGetSnapshot is an initialization step of Dumper.
|
|
func tidbGetSnapshot(d *Dumper) error {
|
|
conf, doPdGC := d.conf, d.tidbPDClientForGC != nil
|
|
consistency := conf.Consistency
|
|
pool, tctx := d.dbHandle, d.tctx
|
|
snapshotConsistency := consistency == "snapshot"
|
|
if conf.Snapshot == "" && (doPdGC || snapshotConsistency) {
|
|
conn, err := pool.Conn(tctx)
|
|
if err != nil {
|
|
tctx.L().Warn("fail to open connection to get snapshot from TiDB", log.ShortError(err))
|
|
// for consistency snapshot, we must get a snapshot here, or we will dump inconsistent data, but for other consistency we can ignore this error.
|
|
if !snapshotConsistency {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
snapshot, err := getSnapshot(conn)
|
|
_ = conn.Close()
|
|
if err != nil {
|
|
tctx.L().Warn("fail to get snapshot from TiDB", log.ShortError(err))
|
|
// for consistency snapshot, we must get a snapshot here, or we will dump inconsistent data, but for other consistency we can ignore this error.
|
|
if !snapshotConsistency {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
conf.Snapshot = snapshot
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tidbStartGCSavepointUpdateService is an initialization step of Dumper.
|
|
func tidbStartGCSavepointUpdateService(d *Dumper) error {
|
|
tctx, pool, conf := d.tctx, d.dbHandle, d.conf
|
|
snapshot, si := conf.Snapshot, conf.ServerInfo
|
|
if d.tidbPDClientForGC != nil {
|
|
snapshotTS, err := parseSnapshotToTSO(pool, snapshot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go updateServiceSafePoint(tctx, d.tidbPDClientForGC, defaultDumpGCSafePointTTL, snapshotTS)
|
|
} else if si.ServerType == version.ServerTypeTiDB {
|
|
tctx.L().Warn("If the amount of data to dump is large, criteria: (data more than 60GB or dumped time more than 10 minutes)\n" +
|
|
"you'd better adjust the tikv_gc_life_time to avoid export failure due to TiDB GC during the dump process.\n" +
|
|
"Before dumping: run sql `update mysql.tidb set VARIABLE_VALUE = '720h' where VARIABLE_NAME = 'tikv_gc_life_time';` in tidb.\n" +
|
|
"After dumping: run sql `update mysql.tidb set VARIABLE_VALUE = '10m' where VARIABLE_NAME = 'tikv_gc_life_time';` in tidb.\n")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateServiceSafePoint(tctx *tcontext.Context, pdClient pd.Client, ttl int64, snapshotTS uint64) {
|
|
updateInterval := time.Duration(ttl/2) * time.Second
|
|
tick := time.NewTicker(updateInterval)
|
|
dumplingServiceSafePointID := fmt.Sprintf("%s_%d", dumplingServiceSafePointPrefix, time.Now().UnixNano())
|
|
tctx.L().Info("generate dumpling gc safePoint id", zap.String("id", dumplingServiceSafePointID))
|
|
|
|
for {
|
|
tctx.L().Debug("update PD safePoint limit with ttl",
|
|
zap.Uint64("safePoint", snapshotTS),
|
|
zap.Int64("ttl", ttl))
|
|
for retryCnt := range 11 {
|
|
_, err := pdClient.UpdateServiceGCSafePoint(tctx, dumplingServiceSafePointID, ttl, snapshotTS)
|
|
if err == nil {
|
|
break
|
|
}
|
|
tctx.L().Debug("update PD safePoint failed", log.ShortError(err), zap.Int("retryTime", retryCnt))
|
|
select {
|
|
case <-tctx.Done():
|
|
return
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
select {
|
|
case <-tctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// setDefaultSessionParams is a step to set default params for session params.
|
|
func setDefaultSessionParams(si version.ServerInfo, sessionParams map[string]any) {
|
|
defaultSessionParams := map[string]any{}
|
|
if si.ServerType == version.ServerTypeTiDB && si.HasTiKV && si.ServerVersion.Compare(*enablePagingVersion) >= 0 {
|
|
defaultSessionParams["tidb_enable_paging"] = "ON"
|
|
}
|
|
for k, v := range defaultSessionParams {
|
|
if _, ok := sessionParams[k]; !ok {
|
|
sessionParams[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
// setSessionParam is an initialization step of Dumper.
|
|
func setSessionParam(d *Dumper) error {
|
|
conf, pool := d.conf, d.dbHandle
|
|
si := conf.ServerInfo
|
|
consistency, snapshot := conf.Consistency, conf.Snapshot
|
|
sessionParam := conf.SessionParams
|
|
if si.ServerType == version.ServerTypeTiDB && conf.TiDBMemQuotaQuery != UnspecifiedSize {
|
|
sessionParam[TiDBMemQuotaQueryName] = conf.TiDBMemQuotaQuery
|
|
}
|
|
var err error
|
|
if snapshot != "" {
|
|
if si.ServerType != version.ServerTypeTiDB {
|
|
return errors.New("snapshot consistency is not supported for this server")
|
|
}
|
|
if consistency == ConsistencyTypeSnapshot {
|
|
conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool)
|
|
if err != nil {
|
|
d.L().Info("cannot check whether TiDB has TiKV, will apply tidb_snapshot by default. This won't affect dump process", log.ShortError(err))
|
|
}
|
|
if conf.ServerInfo.HasTiKV {
|
|
sessionParam[snapshotVar] = snapshot
|
|
}
|
|
}
|
|
}
|
|
if d.dbHandle, err = resetDBWithSessionParams(d.tctx, pool, conf.GetDriverConfig(""), conf.SessionParams); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func openDB(cfg *mysql.Config) (*sql.DB, error) {
|
|
c, err := mysql.NewConnector(cfg)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return sql.OpenDB(c), nil
|
|
}
|
|
|
|
func (d *Dumper) renewSelectTableRegionFuncForLowerTiDB(tctx *tcontext.Context) error {
|
|
conf := d.conf
|
|
if !(conf.ServerInfo.ServerType == version.ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && conf.ServerInfo.HasTiKV &&
|
|
conf.ServerInfo.ServerVersion.Compare(*decodeRegionVersion) >= 0 &&
|
|
conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) < 0) {
|
|
tctx.L().Debug("no need to build region info because database is not TiDB 3.x")
|
|
return nil
|
|
}
|
|
// for TiDB v3.0+, the original selectTiDBTableRegionFunc will always fail,
|
|
// because TiDB v3.0 doesn't have `tidb_decode_key` function nor `DB_NAME`,`TABLE_NAME` columns in `INFORMATION_SCHEMA.TIKV_REGION_STATUS`.
|
|
// reference: https://github.com/pingcap/tidb/blob/c497d5c/dumpling/export/dump.go#L775
|
|
// To avoid this function continuously returning errors and confusing users because we fail to init this function at first,
|
|
// selectTiDBTableRegionFunc is set to always return an ignorable error at first.
|
|
d.selectTiDBTableRegionFunc = func(_ *tcontext.Context, _ *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) {
|
|
return nil, nil, errors.Annotatef(errEmptyHandleVals, "table: `%s`.`%s`", escapeString(meta.DatabaseName()), escapeString(meta.TableName()))
|
|
}
|
|
dbHandle, err := openDBFunc(conf.GetDriverConfig(""))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer func() {
|
|
_ = dbHandle.Close()
|
|
}()
|
|
conn, err := dbHandle.Conn(tctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer func() {
|
|
_ = conn.Close()
|
|
}()
|
|
dbInfos, err := GetDBInfo(conn, DatabaseTablesToMap(conf.Tables))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
regionsInfo, err := GetRegionInfos(conn)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
tikvHelper := &helper.Helper{}
|
|
tableInfos := tikvHelper.GetRegionsTableInfo(regionsInfo, infoschema.DBInfoAsInfoSchema(dbInfos), nil)
|
|
|
|
tableInfoMap := make(map[string]map[string][]int64, len(conf.Tables))
|
|
for _, region := range regionsInfo.Regions {
|
|
tableList := tableInfos[region.ID]
|
|
for _, table := range tableList {
|
|
db, tbl := table.DB.Name.O, table.Table.Name.O
|
|
if _, ok := tableInfoMap[db]; !ok {
|
|
tableInfoMap[db] = make(map[string][]int64, len(conf.Tables[db]))
|
|
}
|
|
|
|
key, err := hex.DecodeString(region.StartKey)
|
|
if err != nil {
|
|
d.L().Debug("invalid region start key", log.ShortError(err), zap.String("key", region.StartKey))
|
|
continue
|
|
}
|
|
// Auto decode byte if needed.
|
|
_, bs, err := codec.DecodeBytes(key, nil)
|
|
if err == nil {
|
|
key = bs
|
|
}
|
|
// Try to decode it as a record key.
|
|
tableID, handle, err := tablecodec.DecodeRecordKey(key)
|
|
if err != nil {
|
|
d.L().Debug("cannot decode region start key", log.ShortError(err), zap.String("key", region.StartKey), zap.Int64("tableID", tableID))
|
|
continue
|
|
}
|
|
if handle.IsInt() {
|
|
tableInfoMap[db][tbl] = append(tableInfoMap[db][tbl], handle.IntValue())
|
|
} else {
|
|
d.L().Debug("not an int handle", log.ShortError(err), zap.Stringer("handle", handle))
|
|
}
|
|
}
|
|
}
|
|
for _, tbInfos := range tableInfoMap {
|
|
for _, tbInfoLoop := range tbInfos {
|
|
// make sure tbInfo is only used in this loop
|
|
tbInfo := tbInfoLoop
|
|
slices.Sort(tbInfo)
|
|
}
|
|
}
|
|
|
|
d.selectTiDBTableRegionFunc = func(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) {
|
|
pkFields, _, err = selectTiDBRowKeyFields(tctx, conn, meta, checkTiDBTableRegionPkFields)
|
|
if err != nil {
|
|
return
|
|
}
|
|
dbName, tableName := meta.DatabaseName(), meta.TableName()
|
|
if tbInfos, ok := tableInfoMap[dbName]; ok {
|
|
if tbInfo, ok := tbInfos[tableName]; ok {
|
|
pkVals = make([][]string, len(tbInfo))
|
|
for i, val := range tbInfo {
|
|
pkVals[i] = []string{strconv.FormatInt(val, 10)}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Dumper) newTaskTableData(meta TableMeta, data TableDataIR, currentChunk, totalChunks int) *TaskTableData {
|
|
d.metrics.totalChunks.Add(1)
|
|
return NewTaskTableData(meta, data, currentChunk, totalChunks)
|
|
}
|