brie: support batch ddl for sql restore (#49089)

close pingcap/tidb#48301
This commit is contained in:
Jianjun Liao
2023-12-27 18:27:27 +08:00
committed by GitHub
parent 063228660d
commit 8709bb53df
11 changed files with 669 additions and 430 deletions

View File

@ -14,9 +14,7 @@ go_library(
"//pkg/domain",
"//pkg/executor",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/session",
"//pkg/session/types",
"//pkg/sessionctx",
@ -33,17 +31,11 @@ go_test(
srcs = ["glue_test.go"],
embed = [":gluetidb"],
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/glue",
"//pkg/ddl",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/types",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)

View File

@ -3,9 +3,7 @@
package gluetidb
import (
"bytes"
"context"
"strings"
"time"
"github.com/pingcap/errors"
@ -18,9 +16,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/session"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/sessionctx"
@ -34,11 +30,7 @@ var (
_ glue.Glue = Glue{}
)
const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
brComment = `/*from(br)*/`
)
const brComment = `/*from(br)*/`
// New makes a new tidb glue.
func New() Glue {
@ -207,17 +199,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...
// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateDatabase(schema)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
return errors.Trace(executor.BRIECreateDatabase(gs.se, schema, brComment))
}
// CreatePlacementPolicy implements glue.Session.
@ -228,95 +210,16 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}
// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr,
infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...)
if kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
}
mid := len(infos) / 2
err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...)
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...)
if err != nil {
return err
}
return nil
}
return err
}
// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr
// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
for db, tablesInDB := range tables {
dbName = model.NewCIStr(db)
queryBuilder := strings.Builder{}
cloneTables := make([]*model.TableInfo, 0, len(tablesInDB))
for _, table := range tablesInDB {
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
queryBuilder.WriteString(query)
queryBuilder.WriteString(";")
table = table.Clone()
// Clone() does not clone partitions yet :(
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
log.Warn("batch create table from tidb failure", zap.Error(err))
return err
}
}
return nil
return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...))
}
// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...))
}
// Close implements glue.Session.
@ -329,30 +232,6 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}
// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
table.AutoIncID = 0
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}
// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}

View File

@ -16,206 +16,15 @@ package gluetidb
import (
"context"
"strconv"
"testing"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)
// batch create table with table id reused
func TestSplitBatchCreateTableWithTableId(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_id_resued1")
tk.MustExec("drop table if exists table_id_resued2")
tk.MustExec("drop table if exists table_id_new")
d := dom.DDL()
require.NotNil(t, d)
infos1 := []*model.TableInfo{}
infos1 = append(infos1, &model.TableInfo{
ID: 124,
Name: model.NewCIStr("table_id_resued1"),
})
infos1 = append(infos1, &model.TableInfo{
ID: 125,
Name: model.NewCIStr("table_id_resued2"),
})
se := &tidbSession{se: tk.Session()}
// keep/reused table id verification
tk.Session().SetValue(sessionctx.QueryString, "skip")
err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").
Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").
Check(testkit.Rows("125"))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
// allocate new table id verification
// query the global id
var id int64
err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
id, err = m.GenGlobalID()
return err
})
require.NoError(t, err)
infos2 := []*model.TableInfo{}
infos2 = append(infos2, &model.TableInfo{
ID: 124,
Name: model.NewCIStr("table_id_new"),
})
tk.Session().SetValue(sessionctx.QueryString, "skip")
err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return true
}))
require.NoError(t, err)
idGen, ok := tk.MustQuery(
"select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").
Rows()[0][0].(string)
require.True(t, ok)
idGenNum, err := strconv.ParseInt(idGen, 10, 64)
require.NoError(t, err)
require.Greater(t, idGenNum, id)
// a empty table info with len(info3) = 0
infos3 := []*model.TableInfo{}
err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
}
// batch create table with table id reused
func TestSplitBatchCreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("drop table if exists table_2")
tk.MustExec("drop table if exists table_3")
d := dom.DDL()
require.NotNil(t, d)
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
ID: 1234,
Name: model.NewCIStr("tables_1"),
})
infos = append(infos, &model.TableInfo{
ID: 1235,
Name: model.NewCIStr("tables_2"),
})
infos = append(infos, &model.TableInfo{
ID: 1236,
Name: model.NewCIStr("tables_3"),
})
se := &tidbSession{se: tk.Session()}
// keep/reused table id verification
tk.Session().SetValue(sessionctx.QueryString, "skip")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)"))
err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3"))
jobs := tk.MustQuery("admin show ddl jobs").Rows()
require.Greater(t, len(jobs), 3)
// check table_1
job1 := jobs[0]
require.Equal(t, "test", job1[1])
require.Equal(t, "tables_3", job1[2])
require.Equal(t, "create tables", job1[3])
require.Equal(t, "public", job1[4])
// check table_2
job2 := jobs[1]
require.Equal(t, "test", job2[1])
require.Equal(t, "tables_2", job2[2])
require.Equal(t, "create tables", job2[3])
require.Equal(t, "public", job2[4])
// check table_3
job3 := jobs[2]
require.Equal(t, "test", job3[1])
require.Equal(t, "tables_1", job3[2])
require.Equal(t, "create tables", job3[3])
require.Equal(t, "public", job3[4])
// check reused table id
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").
Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").
Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").
Check(testkit.Rows("1236"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
// batch create table with table id reused
func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("drop table if exists table_2")
tk.MustExec("drop table if exists table_3")
d := dom.DDL()
require.NotNil(t, d)
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_2"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_3"),
})
se := &tidbSession{se: tk.Session()}
tk.Session().SetValue(sessionctx.QueryString, "skip")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)"))
err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return true
}))
require.True(t, kv.ErrEntryTooLarge.Equal(err))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
func TestTheSessionIsoation(t *testing.T) {
req := require.New(t)
store := testkit.CreateMockStore(t)

View File

@ -230,24 +230,22 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error {
rc.backupMeta = new(backuppb.BackupMeta)
}
// Only in binary we can use multi-thread sessions to create tables.
// so use OwnStorage() to tell whether we are use binary or SQL.
if g.OwnsStorage() {
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) {
db, _, err := NewDB(g, store, rc.policyMode)
return db, err
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(rc.dbPool)),
)
}
// There are different ways to create session between in binary and in SQL.
//
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) {
db, _, err := NewDB(g, store, rc.policyMode)
return db, err
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(rc.dbPool)),
)
}
return errors.Trace(err)
}
@ -487,12 +485,21 @@ func (rc *Client) GetRewriteMode() RewriteMode {
return rc.rewriteMode
}
// Close a client.
func (rc *Client) Close() {
func (rc *Client) closeConn() {
// rc.db can be nil in raw kv mode.
if rc.db != nil {
rc.db.Close()
}
for _, db := range rc.dbPool {
db.Close()
}
}
// Close a client.
func (rc *Client) Close() {
// close the connection, and it must be succeed when in SQL mode.
rc.closeConn()
if rc.rawKVClient != nil {
rc.rawKVClient.Close()
}

View File

@ -281,7 +281,9 @@ func makeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) {
if e != nil {
return dbPool, e
}
dbPool = append(dbPool, db)
if db != nil {
dbPool = append(dbPool, db)
}
}
return dbPool, nil
}

View File

@ -18,6 +18,7 @@ go_library(
"batch_point_get.go",
"bind.go",
"brie.go",
"brie_utils.go",
"builder.go",
"change.go",
"checksum.go",
@ -297,6 +298,7 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"brie_utils_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
"compact_table_test.go",

View File

@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/model"
@ -101,7 +100,7 @@ func (p *brieTaskProgress) Close() {
p.lock.Lock()
current := atomic.LoadInt64(&p.current)
if current < p.total {
p.cmd = fmt.Sprintf("%s Cacneled", p.cmd)
p.cmd = fmt.Sprintf("%s Canceled", p.cmd)
}
atomic.StoreInt64(&p.current, p.total)
p.lock.Unlock()
@ -564,7 +563,7 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error {
defer bq.releaseTask()
e.info.execTime = types.CurrentTime(mysql.TypeDatetime)
glue := &tidbGlueSession{se: e.Ctx(), progress: progress, info: e.info}
glue := &tidbGlue{se: e.Ctx(), progress: progress, info: e.info}
switch e.info.kind {
case ast.BRIEKindBackup:
@ -632,25 +631,82 @@ func (e *ShowExec) fetchShowBRIE(kind ast.BRIEKind) error {
return nil
}
type tidbGlueSession struct {
type tidbGlue struct {
// the session context of the brie task
se sessionctx.Context
progress *brieTaskProgress
info *brieTaskInfo
}
// GetSessionCtx implements glue.Glue
func (gs *tidbGlueSession) GetSessionCtx() sessionctx.Context {
return gs.se
}
// GetDomain implements glue.Glue
func (gs *tidbGlueSession) GetDomain(_ kv.Storage) (*domain.Domain, error) {
func (gs *tidbGlue) GetDomain(_ kv.Storage) (*domain.Domain, error) {
return domain.GetDomain(gs.se), nil
}
// CreateSession implements glue.Glue
func (gs *tidbGlueSession) CreateSession(_ kv.Storage) (glue.Session, error) {
return gs, nil
func (gs *tidbGlue) CreateSession(_ kv.Storage) (glue.Session, error) {
newSCtx, err := CreateSession(gs.se)
if err != nil {
return nil, err
}
return &tidbGlueSession{se: newSCtx}, nil
}
// Open implements glue.Glue
func (gs *tidbGlue) Open(string, pd.SecurityOption) (kv.Storage, error) {
return gs.se.GetStore(), nil
}
// OwnsStorage implements glue.Glue
func (*tidbGlue) OwnsStorage() bool {
return false
}
// StartProgress implements glue.Glue
func (gs *tidbGlue) StartProgress(_ context.Context, cmdName string, total int64, _ bool) glue.Progress {
gs.progress.lock.Lock()
gs.progress.cmd = cmdName
gs.progress.total = total
atomic.StoreInt64(&gs.progress.current, 0)
gs.progress.lock.Unlock()
return gs.progress
}
// Record implements glue.Glue
func (gs *tidbGlue) Record(name string, value uint64) {
switch name {
case "BackupTS":
gs.info.backupTS = value
case "RestoreTS":
gs.info.restoreTS = value
case "Size":
gs.info.archiveSize = value
}
}
func (*tidbGlue) GetVersion() string {
return "TiDB\n" + printer.GetTiDBInfo()
}
// UseOneShotSession implements glue.Glue
func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Session) error) error {
// In SQL backup, we don't need to close domain,
// but need to create an new session.
newSCtx, err := CreateSession(gs.se)
if err != nil {
return err
}
glueSession := &tidbGlueSession{se: newSCtx}
defer func() {
CloseSession(newSCtx)
log.Info("one shot session from brie closed")
}()
return fn(glueSession)
}
type tidbGlueSession struct {
// the session context of the brie task's subtask, such as `CREATE TABLE`.
se sessionctx.Context
}
// Execute implements glue.Session
@ -672,46 +728,24 @@ func (gs *tidbGlueSession) ExecuteInternal(ctx context.Context, sql string, args
// CreateDatabase implements glue.Session
func (gs *tidbGlueSession) CreateDatabase(_ context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
// 512 is defaultCapOfCreateTable.
result := bytes.NewBuffer(make([]byte, 0, 512))
if err := ConstructResultOfShowCreateDatabase(gs.se, schema, true, result); err != nil {
return err
}
gs.se.SetValue(sessionctx.QueryString, result.String())
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
return BRIECreateDatabase(gs.se, schema, "")
}
// CreateTable implements glue.Session
func (gs *tidbGlueSession) CreateTable(_ context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
return BRIECreateTable(gs.se, dbName, table, "", cs...)
}
// 512 is defaultCapOfCreateTable.
result := bytes.NewBuffer(make([]byte, 0, 512))
if err := ConstructResultOfShowCreateTable(gs.se, table, autoid.Allocators{}, result); err != nil {
return err
}
gs.se.SetValue(sessionctx.QueryString, result.String())
// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbGlueSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
return BRIECreateTables(gs.se, tables, "", cs...)
}
// CreatePlacementPolicy implements glue.Session
func (gs *tidbGlueSession) CreatePlacementPolicy(_ context.Context, policy *model.PolicyInfo) error {
originQueryString := gs.se.Value(sessionctx.QueryString)
defer gs.se.SetValue(sessionctx.QueryString, originQueryString)
gs.se.SetValue(sessionctx.QueryString, ConstructResultOfShowCreatePlacementPolicy(policy))
d := domain.GetDomain(gs.se).DDL()
// the default behaviour is ignoring duplicated policy during restore.
@ -719,7 +753,8 @@ func (gs *tidbGlueSession) CreatePlacementPolicy(_ context.Context, policy *mode
}
// Close implements glue.Session
func (*tidbGlueSession) Close() {
func (gs *tidbGlueSession) Close() {
CloseSession(gs.se)
}
// GetGlobalVariables implements glue.Session.
@ -727,46 +762,9 @@ func (gs *tidbGlueSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}
// Open implements glue.Glue
func (gs *tidbGlueSession) Open(string, pd.SecurityOption) (kv.Storage, error) {
return gs.se.GetStore(), nil
}
// OwnsStorage implements glue.Glue
func (*tidbGlueSession) OwnsStorage() bool {
return false
}
// StartProgress implements glue.Glue
func (gs *tidbGlueSession) StartProgress(_ context.Context, cmdName string, total int64, _ bool) glue.Progress {
gs.progress.lock.Lock()
gs.progress.cmd = cmdName
gs.progress.total = total
atomic.StoreInt64(&gs.progress.current, 0)
gs.progress.lock.Unlock()
return gs.progress
}
// Record implements glue.Glue
func (gs *tidbGlueSession) Record(name string, value uint64) {
switch name {
case "BackupTS":
gs.info.backupTS = value
case "RestoreTS":
gs.info.restoreTS = value
case "Size":
gs.info.archiveSize = value
}
}
func (*tidbGlueSession) GetVersion() string {
return "TiDB\n" + printer.GetTiDBInfo()
}
// UseOneShotSession implements glue.Glue
func (gs *tidbGlueSession) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Session) error) error {
// in SQL backup. we don't need to close domain.
return fn(gs)
// GetSessionCtx implements glue.Glue
func (gs *tidbGlueSession) GetSessionCtx() sessionctx.Context {
return gs.se
}
func restoreQuery(stmt *ast.BRIEStmt) string {

View File

@ -36,7 +36,7 @@ import (
)
func TestGlueGetVersion(t *testing.T) {
g := tidbGlueSession{}
g := tidbGlue{}
version := g.GetVersion()
require.Contains(t, version, `Release Version`)
require.Contains(t, version, `Git Commit Hash`)

183
pkg/executor/brie_utils.go Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bytes"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"go.uber.org/zap"
)
const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
)
// SplitBatchCreateTableForTest is only used for test.
var SplitBatchCreateTableForTest = splitBatchCreateTable
// showRestoredCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func showRestoredCreateDatabase(sctx sessionctx.Context, db *model.DBInfo, brComment string) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
if len(brComment) > 0 {
// this can never fail.
_, _ = result.WriteString(brComment)
}
if err := ConstructResultOfShowCreateDatabase(sctx, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}
// BRIECreateDatabase creates the database with OnExistIgnore option
func BRIECreateDatabase(sctx sessionctx.Context, schema *model.DBInfo, brComment string) error {
d := domain.GetDomain(sctx).DDL()
query, err := showRestoredCreateDatabase(sctx, schema, brComment)
if err != nil {
return errors.Trace(err)
}
originQuery := sctx.Value(sessionctx.QueryString)
sctx.SetValue(sessionctx.QueryString, query)
defer func() {
sctx.SetValue(sessionctx.QueryString, originQuery)
}()
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(sctx, schema, ddl.OnExistIgnore)
}
// showRestoredCreateTable shows the result of SHOW CREATE TABLE from a tableInfo.
func showRestoredCreateTable(sctx sessionctx.Context, tbl *model.TableInfo, brComment string) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
if len(brComment) > 0 {
// this can never fail.
_, _ = result.WriteString(brComment)
}
if err := ConstructResultOfShowCreateTable(sctx, tbl, autoid.Allocators{}, result); err != nil {
return "", err
}
return result.String(), nil
}
// BRIECreateTable creates the table with OnExistIgnore option
func BRIECreateTable(
sctx sessionctx.Context,
dbName model.CIStr,
table *model.TableInfo,
brComment string,
cs ...ddl.CreateTableWithInfoConfigurier,
) error {
d := domain.GetDomain(sctx).DDL()
query, err := showRestoredCreateTable(sctx, table, brComment)
if err != nil {
return err
}
originQuery := sctx.Value(sessionctx.QueryString)
sctx.SetValue(sessionctx.QueryString, query)
// Disable foreign key check when batch create tables.
originForeignKeyChecks := sctx.GetSessionVars().ForeignKeyChecks
sctx.GetSessionVars().ForeignKeyChecks = false
defer func() {
sctx.SetValue(sessionctx.QueryString, originQuery)
sctx.GetSessionVars().ForeignKeyChecks = originForeignKeyChecks
}()
table = table.Clone()
return d.CreateTableWithInfo(sctx, dbName, table, append(cs, ddl.OnExistIgnore)...)
}
// BRIECreateTables creates the tables with OnExistIgnore option in batch
func BRIECreateTables(
sctx sessionctx.Context,
tables map[string][]*model.TableInfo,
brComment string,
cs ...ddl.CreateTableWithInfoConfigurier,
) error {
// Disable foreign key check when batch create tables.
originForeignKeyChecks := sctx.GetSessionVars().ForeignKeyChecks
sctx.GetSessionVars().ForeignKeyChecks = false
originQuery := sctx.Value(sessionctx.QueryString)
defer func() {
sctx.SetValue(sessionctx.QueryString, originQuery)
sctx.GetSessionVars().ForeignKeyChecks = originForeignKeyChecks
}()
for db, tablesInDB := range tables {
dbName := model.NewCIStr(db)
queryBuilder := strings.Builder{}
cloneTables := make([]*model.TableInfo, 0, len(tablesInDB))
for _, table := range tablesInDB {
query, err := showRestoredCreateTable(sctx, table, brComment)
if err != nil {
return errors.Trace(err)
}
queryBuilder.WriteString(query)
queryBuilder.WriteString(";")
cloneTables = append(cloneTables, table.Clone())
}
sctx.SetValue(sessionctx.QueryString, queryBuilder.String())
if err := splitBatchCreateTable(sctx, dbName, cloneTables, cs...); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
log.Warn("batch create table from tidb failure", zap.Error(err))
return err
}
}
return nil
}
// splitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func splitBatchCreateTable(sctx sessionctx.Context, schema model.CIStr,
infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(sctx).DDL()
err = d.BatchCreateTableWithInfo(sctx, schema, infos, append(cs, ddl.OnExistIgnore)...)
if kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
}
mid := len(infos) / 2
err = splitBatchCreateTable(sctx, schema, infos[:mid], cs...)
if err != nil {
return err
}
err = splitBatchCreateTable(sctx, schema, infos[mid:], cs...)
if err != nil {
return err
}
return nil
}
return err
}

View File

@ -0,0 +1,322 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor_test
import (
"context"
"fmt"
"strconv"
"testing"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
// batch create table with table id reused
func TestSplitBatchCreateTableWithTableId(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_id_resued1")
tk.MustExec("drop table if exists table_id_resued2")
tk.MustExec("drop table if exists table_id_new")
d := dom.DDL()
require.NotNil(t, d)
infos1 := []*model.TableInfo{}
infos1 = append(infos1, &model.TableInfo{
ID: 124,
Name: model.NewCIStr("table_id_resued1"),
})
infos1 = append(infos1, &model.TableInfo{
ID: 125,
Name: model.NewCIStr("table_id_resued2"),
})
sctx := tk.Session()
// keep/reused table id verification
sctx.SetValue(sessionctx.QueryString, "skip")
err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
require.Equal(t, "skip", sctx.Value(sessionctx.QueryString))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").
Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").
Check(testkit.Rows("125"))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
// allocate new table id verification
// query the global id
var id int64
err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err error
id, err = m.GenGlobalID()
return err
})
require.NoError(t, err)
infos2 := []*model.TableInfo{}
infos2 = append(infos2, &model.TableInfo{
ID: 124,
Name: model.NewCIStr("table_id_new"),
})
tk.Session().SetValue(sessionctx.QueryString, "skip")
err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return true
}))
require.NoError(t, err)
require.Equal(t, "skip", sctx.Value(sessionctx.QueryString))
idGen, ok := tk.MustQuery(
"select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").
Rows()[0][0].(string)
require.True(t, ok)
idGenNum, err := strconv.ParseInt(idGen, 10, 64)
require.NoError(t, err)
require.Greater(t, idGenNum, id)
// a empty table info with len(info3) = 0
infos3 := []*model.TableInfo{}
originQueryString := sctx.Value(sessionctx.QueryString)
err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
}
// batch create table with table id reused
func TestSplitBatchCreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("drop table if exists table_2")
tk.MustExec("drop table if exists table_3")
d := dom.DDL()
require.NotNil(t, d)
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
ID: 1234,
Name: model.NewCIStr("tables_1"),
})
infos = append(infos, &model.TableInfo{
ID: 1235,
Name: model.NewCIStr("tables_2"),
})
infos = append(infos, &model.TableInfo{
ID: 1236,
Name: model.NewCIStr("tables_3"),
})
sctx := tk.Session()
// keep/reused table id verification
tk.Session().SetValue(sessionctx.QueryString, "skip")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)"))
err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return false
}))
require.NoError(t, err)
require.Equal(t, "skip", sctx.Value(sessionctx.QueryString))
tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3"))
jobs := tk.MustQuery("admin show ddl jobs").Rows()
require.Greater(t, len(jobs), 3)
// check table_1
job1 := jobs[0]
require.Equal(t, "test", job1[1])
require.Equal(t, "tables_3", job1[2])
require.Equal(t, "create tables", job1[3])
require.Equal(t, "public", job1[4])
// check table_2
job2 := jobs[1]
require.Equal(t, "test", job2[1])
require.Equal(t, "tables_2", job2[2])
require.Equal(t, "create tables", job2[3])
require.Equal(t, "public", job2[4])
// check table_3
job3 := jobs[2]
require.Equal(t, "test", job3[1])
require.Equal(t, "tables_1", job3[2])
require.Equal(t, "create tables", job3[3])
require.Equal(t, "public", job3[4])
// check reused table id
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").
Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").
Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").
Check(testkit.Rows("1236"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
// batch create table with table id reused
func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("drop table if exists table_2")
tk.MustExec("drop table if exists table_3")
d := dom.DDL()
require.NotNil(t, d)
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_2"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_3"),
})
sctx := tk.Session()
tk.Session().SetValue(sessionctx.QueryString, "skip")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)"))
err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool {
return true
}))
require.Equal(t, "skip", sctx.Value(sessionctx.QueryString))
require.True(t, kv.ErrEntryTooLarge.Equal(err))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
func TestBRIECreateDatabase(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists db_1")
tk.MustExec("drop database if exists db_1")
d := dom.DDL()
require.NotNil(t, d)
sctx := tk.Session()
originQueryString := sctx.Value(sessionctx.QueryString)
schema1 := &model.DBInfo{
ID: 1230,
Name: model.NewCIStr("db_1"),
Charset: "utf8mb4",
Collate: "utf8mb4_bin",
State: model.StatePublic,
}
err := executor.BRIECreateDatabase(sctx, schema1, "/* from test */")
require.NoError(t, err)
schema2 := &model.DBInfo{
ID: 1240,
Name: model.NewCIStr("db_2"),
Charset: "utf8mb4",
Collate: "utf8mb4_bin",
State: model.StatePublic,
}
err = executor.BRIECreateDatabase(sctx, schema2, "")
require.NoError(t, err)
require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
tk.MustExec("use db_1")
tk.MustExec("use db_2")
}
func mockTableInfo(t *testing.T, sctx sessionctx.Context, createSQL string) *model.TableInfo {
node, err := parser.New().ParseOneStmt(createSQL, "", "")
require.NoError(t, err)
info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
require.NoError(t, err)
info.State = model.StatePublic
return info
}
func TestBRIECreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists table_1")
tk.MustExec("drop table if exists table_2")
d := dom.DDL()
require.NotNil(t, d)
sctx := tk.Session()
originQueryString := sctx.Value(sessionctx.QueryString)
dbName := model.NewCIStr("test")
tableInfo := mockTableInfo(t, sctx, "create table test.table_1 (a int primary key, b json, c varchar(20))")
tableInfo.ID = 1230
err := executor.BRIECreateTable(sctx, dbName, tableInfo, "/* from test */")
require.NoError(t, err)
tableInfo.ID = 1240
tableInfo.Name = model.NewCIStr("table_2")
err = executor.BRIECreateTable(sctx, dbName, tableInfo, "")
require.NoError(t, err)
require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
tk.MustExec("desc table_1")
tk.MustExec("desc table_2")
}
func TestBRIECreateTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tableInfos := make([]*model.TableInfo, 100)
for i := range tableInfos {
tk.MustExec(fmt.Sprintf("drop table if exists table_%d", i))
}
d := dom.DDL()
require.NotNil(t, d)
sctx := tk.Session()
originQueryString := sctx.Value(sessionctx.QueryString)
for i := range tableInfos {
tableInfos[i] = mockTableInfo(t, sctx, fmt.Sprintf("create table test.table_%d (a int primary key, b json, c varchar(20))", i))
tableInfos[i].ID = 1230 + int64(i)
}
err := executor.BRIECreateTables(sctx, map[string][]*model.TableInfo{"test": tableInfos}, "/* from test */")
require.NoError(t, err)
require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
for i := range tableInfos {
tk.MustExec(fmt.Sprintf("desc table_%d", i))
}
}

View File

@ -15,8 +15,10 @@
package brietest
import (
"fmt"
"os"
"path"
"strings"
"testing"
"github.com/pingcap/tidb/pkg/config"
@ -70,3 +72,46 @@ func TestBackupAndRestore(t *testing.T) {
tk.MustQuery("select count(*) from t1").Check(testkit.Rows("3"))
tk.MustExec("drop database br")
}
func TestRestoreMultiTables(t *testing.T) {
tk := initTestKit(t)
tk.MustExec("create database if not exists br")
tk.MustExec("use br")
tablesNameSet := make(map[string]struct{})
tableNum := 1000
for i := 0; i < tableNum; i += 1 {
tk.MustExec(fmt.Sprintf("create table table_%d (a int primary key, b json, c varchar(20))", i))
tk.MustExec(fmt.Sprintf("insert into table_%d values (1, '{\"a\": 1, \"b\": 2}', '123')", i))
tk.MustQuery(fmt.Sprintf("select count(*) from table_%d", i)).Check(testkit.Rows("1"))
tablesNameSet[fmt.Sprintf("table_%d", i)] = struct{}{}
}
tmpDir := path.Join(os.TempDir(), "bk1")
require.NoError(t, os.RemoveAll(tmpDir))
// backup database to tmp dir
tk.MustQuery("backup database br to 'local://" + tmpDir + "'")
// remove database for recovery
tk.MustExec("drop database br")
// restore database with backup data
tk.MustQuery("restore database * from 'local://" + tmpDir + "'")
tk.MustExec("use br")
ddlCreateTablesRows := tk.MustQuery("admin show ddl jobs where JOB_TYPE = 'create tables'").Rows()
cnt := 0
for _, row := range ddlCreateTablesRows {
tables := row[2].(string)
require.NotEqual(t, "", tables)
for _, table := range strings.Split(tables, ",") {
_, ok := tablesNameSet[table]
require.True(t, ok)
cnt += 1
}
}
require.Equal(t, tableNum, cnt)
for i := 0; i < tableNum; i += 1 {
tk.MustQuery(fmt.Sprintf("select count(*) from table_%d", i)).Check(testkit.Rows("1"))
}
tk.MustExec("drop database br")
}